Develop Application using IDE - Externalize Properties

Here is the final code which contain logic to read the externalize properties and process the data to compute daily product revenue.

  • Import necessary classes and APIs
  • Load Run Time Properties
  • Build Spark Session object
  • Read orders, order_items and products
  • Filter for COMPLETE or CLOSED orders
  • Join filtered orders and order_items
  • Compute daily product revenue using order_date and order_item_product_id as key
  • Join with products and get order_date, product_name and revenue
  • Sort the data and save the output to the files
package retail

import com.typesafe.config.ConfigFactory

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{round, sum, col}

object GetDailyProductRevenue {
  def main(args: Array[String]): Unit = {
    val conf = ConfigFactory.load
    val props = conf.getConfig(args(0))

    val inputBaseDir = props.getString("input.base.dir")
    val outputBaseDir = props.getString("output.base.dir")
    val spark = if(args(0) == "dev") {
      SparkSession.
        builder().
        master("local").
        appName("Get Daily Product Revenue").
        getOrCreate()
    } else {
      SparkSession.
        builder().
        appName("Get Daily Product Revenue").
        getOrCreate()
    }

    import spark.implicits._
    val orders = spark.read.
      schema("order_id INT, order_date TIMESTAMP, order_customer_id INT, order_status STRING").
      csv(inputBaseDir + "/orders")

    val orderItems = spark.read.
      schema(
        s"""order_item_id INT,
        order_item_order_id INT,
        order_item_product_id INT,
        order_item_quantity INT,
        order_item_subtotal FLOAT,
        order_item_product_price FLOAT""").
      csv(inputBaseDir + "/order_items")

    val products = spark.read.
      schema(
        s"""product_id INT,
        product_category_id INT,
        product_name STRING,
        product_description STRING,
        product_price FLOAT,
        product_image STRING""").
      csv(inputBaseDir + "/products")

    val ordersCompleted = orders.
      filter("order_status IN ('COMPLETE', 'CLOSED')")

    val joinResults = ordersCompleted.
      join(orderItems, ordersCompleted("order_id") === orderItems("order_item_order_id")).
      join(products, products("product_id") === orderItems("order_item_product_id")).
      select("order_date", "product_name", "order_item_subtotal")

    val dailyProductRevenue = joinResults.
      groupBy("order_date", "product_name").
      agg(round(sum("order_item_subtotal"), 2).alias("revenue"))

    val dailyProductRevenueSorted = dailyProductRevenue.
      orderBy($"order_date", col("revenue").desc)

    spark.conf.set("spark.sql.shuffle.partitions", "2")
    dailyProductRevenueSorted.write.mode("overwrite").csv(outputBaseDir + "/daily_product_revenue")
  }
}

Learn Spark 1.6.x or Spark 2.x on our state of the art big data labs

  • Click here for access to state of the art 13 node Hadoop and Spark Cluster