AWS EMR - Submitting Spark Jobs

In this module we will see the end to end life cycle of developing Spark based applications and deploying them using AWS EMR.

If you want to get notifications for the live sessions, please subscribe by clicking here
You can also sign up for my other courses published in Udemy.
If you want labs for hands-on practice, you can sign up by going here.
Feel free to join our Slack Workspace for direct interaction with us.

Prerequisites

As we will be using EMR to deploy the application, there are some prerequisites which we need to keep in mind.

  • Valid AWS Account
  • Valid s3 Buckets or Directories
    • Input Directory from which data will be sourced.
    • Output Directory in which processed data will be persisted.
    • Application and other 3rd party jars
  • Both input and output will be stored in s3 bucket in separated sub folders or directories - s3://itversitydata
  • We will be using retail data which can be cloned and uploaded from here.
  • As we will be processing retail data, let us ensure that the retail data is already uploaded into s3.
  • Application and other 3rd party jars will be uploaded to S3 bucket called as s3://bdclouddemo

Recap of Application Development

Following are the steps involved in developing Spark Application.

  • Setup Development Environment
    • Local Spark Setup
    • IDE such as IntelliJ
    • Scala Plugin
    • Build tools such as sbt
  • Define Problem Statement
  • Develop Application using appropriate APIs
  • Validate locally using IDE
  • Build the Jar file and ship to the cluster to deploy

Define Problem Statement

Let us develop an application to compute daily product revenue for complete or closed orders.

  • We will be using Data Frame APIs and develop the application to compute daily product revenue.
    • Filter for COMPLETE or CLOSED orders
    • Join orders and order_items using order_id
    • Group by order_date and order_item_product_id, then compute daily product revenue using order_item_subtotal.
  • By this time you should have enough idea about how to filter, join as well as aggregate the data using appropriate Data Frame APIs.

Setup Project and Dependencies

Let us go ahead and Setup Project and add required dependencies.

  • We will be creating a new application by name spark2demo.
  • Let us make sure appropriate versions of Scala and Spark are defined as dependencies as part of build.sbt. Also, we will use 3rd party plugin to externalize properties such as input base directory, output base directory, etc.
  • Make sure to define the appropriate version of Spark deployed on the target production environment where the job will be eventually deployed.
name := "spark2demo"

version := "0.1"

scalaVersion := "2.11.12"
libraryDependencies += "com.typesafe" % "config" % "1.3.2"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4"
  • Let us define application.properties under src/main/resources to define environment specific properties.
  • Make sure to change this as per locations specific to environments you are running in.
dev.execution.mode = local
dev.input.base.dir = /Users/itversity/Research/data/retail_db
dev.output.base.dir = /Users/itversity/Research/data/emrdemo/retail_db

prod.execution.mode = yarn-client
prod.input.base.dir = s3://itversitydata/retail_db
prod.output.base.dir = s3://itversitydata/emrdemo/retail_db

Develop Application

Now let us develop the code to compute daily product revenue.

package retail_db

import com.typesafe.config.ConfigFactory
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

/**
* Created by itversity on 10/08/18.
*/
object GetDailyProductRevenue {
def main(args: Array[String]): Unit = {
val props = ConfigFactory.load()
val envProps = props.getConfig(args(0))
val spark = SparkSession.
    builder.
    appName("Daily Product Revenue").
    master(envProps.getString("execution.mode")).
    getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")
  spark.conf.set("spark.sql.shuffle.partitions", "2")

import spark.implicits._

val inputBaseDir = envProps.getString("input.base.dir")
val orders = spark.
    read.
    schema("""
        order_id INT,
        order_date STRING,
        order_customer_id INT,
        order_status STRING
        """).
    csv(inputBaseDir + "/orders")
val orderItems = spark.
    read.
    option("inferSchema", "true").
    schema("""
        order_item_id INT,
        order_item_order_id INT,
        order_item_product_id INT,
        order_item_quantity INT,
        order_item_subtotal FLOAT,
        order_item_product_price FLOAT
        """).
    csv(inputBaseDir + "/order_items")

val dailyProductRevenue = orders.where("order_status in ('CLOSED', 'COMPLETE')").
    join(orderItems, $"order_id" === $"order_item_order_id").
    groupBy("order_date", "order_item_product_id").
    agg(round(sum($"order_item_subtotal"), 2).alias("revenue")).
    orderBy($"order_date", $"revenue" desc)

val outputBaseDir = envProps.getString("output.base.dir")
  dailyProductRevenue.
    write.
    mode("overwrite").
    json(outputBaseDir + "/daily_product_revenue")
}

}

Validating using IDE

Once the development is done, we can validate locally by running using IDE such as IntelliJ.

  • Run the application - it will fail for the first time as we have to pass the arguments.
  • Go to Run -> Edit Configurations and pass the argument dev.
  • Make sure program exits with code 0.
  • Also go to the output location and confirm that files are created.

Build and Validate Jar

If we have Spark setup locally we should be able to validate by using spark-submit command to validate that our jar is running without any issues.

  • Go to the working directory of the project using Terminal.

  • Build jar file using sbt package.

  • Jar file will be created under /target/scala_2.11 with name based on our project name.

  • Here is the code snippet to run the application using Jar file.

spark-submit \
  --class retail_db.GetDailyProductRevenue \
  --packages com.typesafe:config:1.3.2 \
  target/scala-2.11/spark2demo_2.11-0.1.jar dev

Uploading jars to s3

We can have the jars associated with our application in s3 to run the application using step execution or even directly on the master node of EMR cluster. Let us go ahead and upload the jar file of the application as well as external dependencies to s3.

  • Make sure you have a bucket for the jars related to application - in my case it is bdclouddemo.
  • Upload the jar file that is generated with sbt package to the s3 bucket, bdclouddemo.
  • As our application is using external dependency for externalizing properties (typesafe config), make sure to upload the jar related to typesafe config to s3.
  • EMR Clusters are ephemeral, which means they do not maintain state and might get terminated any point in time. For that reason both input directory and output directory for our application should be in s3.
  • Create the required directories and make sure the data is uploaded into s3 as per the paths that are defined in application.properties.

Recap of Setting up EMR Cluster

Let us recap setting up of EMR Cluster.

  • We can setup EMR Cluster either by using Quick Options or Advanced Options.
  • As part of the Quick Options, we should be able to choose the following:
    • Log Location (Typically s3)
    • EMR Version
    • Limited options for the group of applications such as Core Hadoop, Spark etc.
    • Instance Type for both Masters and Slaves - We have to choose the same Instance Type for both Masters and Slaves.
    • Only one Master which will have Namenode, Resource Manager, Gateway etc depending on the services chosen…
    • Number of Instances
  • As part of the advanced options, we have the flexibility while setting up the cluster.
    • Log Location (Typically s3)
    • EMR Version
    • Ability to choose custom application from the list available with a given version of EMR.
    • Ability to have multiple masters. We can also choose High Availability for critical master components such as Namenode.
    • We can choose a different type of instances for Masters and Workers.
    • Workers are divided into Core and Task Instances. We can enable Auto Scaling features with Task Instances.
    • We can also choose between Uniform Instance Groups or Instance Fleets for Worker Nodes.
  • We can either submit applications using Step Execution or run the applications once the cluster is setup.
    • In Quick Options with Step Execution, Cluster will be automatically terminated once all the steps are completed.
    • In Advanced Options with Step Execution, we can choose to have Cluster in waiting state. In waiting state, one should be able to submit or run ad hoc queries or applications.

Deploying Jar - Step Execution

Let us understand how to deploy Scala based Spark Application using Step Execution as part of the EMR cluster created using Quick Options.

  • Keep in mind that steps are exactly the same to deploy Jar as part of Step Execution when EMR Cluster is created using Advanced Options.
  • Following are the important information to keep in mind to deploy Spark Application as part of Step Execution.
    • Deploy Mode (Cluster or Client)
    • Application Location - Location of the Jar File. We have to copy the Jar File to s3. - s3://bdclouddemo/spark2demo_2.11-0.1.jar
    • Arguments - Application Arguments - prod
    • Spark Submit Options
--class retail_db.GetDailyProductRevenue
--jars s3://bdclouddemo/com.typesafe_config-1.3.2.jar
--master yarn

Reviewing Job Logs

Let us understand how to review the jobs logs when it is submitted as part of EMR Cluster using Step Execution and Quick Options.

  • Cluster will be terminated if the job ran successfully.
  • However, logs will be available as part of s3 and we should be able to review to troubleshoot any kind of issues.
  • If we choose to wait in case of any step fail, then EMR cluster will be up and running and we should be able to go through the logs in live cluster.
  • In some cases we might have to clean up the files and directories created and rerun the job again.