Exercise 15 - Spark - Compute average revenue per day for all completed orders

apache-spark
scala
#1

Problem Statement:

  • Compute average revenue per day
  • Get completed orders
  • Join orders and order_items and get required fields to compute average revenue
  • Divide the revenue per day with number of unique orders per day to get average revenue
  • Hint: You need to use combination of map, reduceByKey/aggregateByKey, join to get the final result

Please provide following details:

  • Provide high level design highlighting all the steps you are going to perform
  • Complete spark based scala code
  • Get first 5 records (use sortByKey before using take)
0 Likes

Exercise 16 - Spark - Build applications using Scala IDE and deploy on cluster
#2

val orders = sc.textFile("/user/nagellarajashyam/sqoop_import/orders")
//Load the orders using sc

val ordersRDD = orders.map(rec=>(rec.split(",")(0),rec))
//Split the data to (orderId,rec)

val filteredOrdersRDD = ordersRDD.filter(rec=>rec._2.split(",")(3)==“COMPLETE”)
//filter the data which has only complted

val orderItems=sc.textFile("/user/nagellarajashyam/sqoop_import/order_items")
//Load the orderItems

val orderItemsRDD= orderItems.map(rec=>(rec.split(",")(1),rec))
//split the data to (orderItemOrderId,rec)

val joinRDD = filteredOrdersRDD.join(orderItemsRDD)

//JoinRDD will contain (orderId,(OrdersTuple,orderItemsTuple))

val joinParsedRDD = joinRDD.map(rec=>(rec._1,(rec._2._1.split(",")(1),rec._2._2.split(",")(4).toFloat)))

//joinParsedRDD will contain (orderId,(date,revenue))

val UniqueJoinParsedRDD= joinParsedRDD.map(rec=>((rec._1,rec._2._1),rec.2.2)).reduceByKey(+)

val UniqueJoinParsedRDDMap= UniqueJoinParsedRDD.map(rec=>(rec._1._1,(rec._1._2,rec._2.toFloat)))

val revenueMap = UniqueJoinParsedRDDMap.map(rec=>(rec._2._1,1))

//for identyfying the no:of orders generate a tuple with (date,1)

val ordersPerDay = revenueMap.reduceByKey(+)

//This ordersPerDay will contain the tuple of(date,no:of orders)

val revenuePerDay = UniqueJoinParsedRDDMap.map(rec=>(rec._2._1,rec._2._2))

//revenuePerDay will contain the revenue perday as (date,revenue) Here there may be duplicate records

val aggRevenuePerDay = revenuePerDay.reduceByKey(+)

// aggRevenuePerDay will contain the revenue perday as (date,aggRevenue)

val totalRevenuePerDay = ordersPerDay.join(aggRevenuePerDay)

//This will give (date,(ordersPerDay,revenue))

val totalAggRevenuePerDay = totalRevenuePerDay.map(rec=>(rec._1,rec._2._2/rec._2._1))

//This final RDD will contain (date,aggRevenue)

totalAggRevenuePerDay.sortByKey().take(10).foreach(println)

//Sorted data

(2013-07-25 00:00:00.0,606.97943)
(2013-07-26 00:00:00.0,585.63745)
(2013-07-27 00:00:00.0,602.84033)
(2013-07-28 00:00:00.0,519.4791)
(2013-07-29 00:00:00.0,611.9819)
(2013-07-30 00:00:00.0,563.7529)
(2013-07-31 00:00:00.0,683.8797)
(2013-08-01 00:00:00.0,606.8479)
(2013-08-02 00:00:00.0,546.7679)
(2013-08-03 00:00:00.0,621.9413)

//using aggregateByKey

val joinParsedRDDMap = joinParsedRDD.map(rec=>(rec._2._1,rec._2._2))

val revenuePerDayWithOrders = joinParsedRDDMap.aggregateByKey((0.0,0))((acc,val)=>(acc._1+val,acc._2+1),(t1,t2)=>(t1._1+t2._1,t1._2+t2._2))

0 Likes

#3

val orderItemsMap = orderItems.map(rec=>(rec.split(",")(1).toInt ,
rec.split(",")(4).toDouble)
)
val redOrderItemsMap= orderItemsMap.reduceByKey(+)
redOrderItemsMap: org.apache.spark.rdd.RDD[(Int, Double)] = ShuffledRDD[10] at reduceByKey at :31

val completeOrders= orders.filter((rec=>rec.split(",")(3)==“COMPLETE”))
completeOrders: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at filter at :29

val completeOrdersMap= completeOrders.map(rec=>(rec.split(",")(0).toInt,
rec.split(",")(1)
))
scala> val joinRdd= completeOrdersMap.join(redOrderItemsMap)

val revenuerdd= joinRdd.map(rec=>(rec._2._1, rec._2._2))

val aggRdd = revenuerdd.aggregateByKey((0.0,0))(
(a,v)=>(a._1+v, a._2+1),
(v1,v2)=> (v1._1+v2._1 , v1._2 +v2._2)
)

val avgRevenue= aggRdd.map(rec=>(rec._1,(rec._2._1/rec._2._2)))

avgRevenue.sortByKey().take(5).foreach(println)

(2013-07-25 00:00:00.0,606.979393939394)
(2013-07-26 00:00:00.0,585.6372222222221)
(2013-07-27 00:00:00.0,602.8401818181818)
(2013-07-28 00:00:00.0,519.4790384615385)
(2013-07-29 00:00:00.0,611.982)

0 Likes

#4

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SparkExercise15 {
def main(args: Array[String]) {

val conf = new SparkConf().setAppName("Sneh Exercise 15").setMaster("local")
val sc = new SparkContext(conf)

val orderItems = sc.textFile("/public/retail_db/order_items")
val oiMap = orderItems.map(rec => (rec.split(",")(1).toInt, rec.split(",")(4).toDouble))
val revenueOI = oiMap.reduceByKey((a, b) => a + b)

val orders = sc.textFile("/public/retail_db/orders")
val orderComplete = orders.filter(rec => rec.split(",")(3) == "COMPLETE").map(rec => (rec.split(",")(0).toInt, rec.split(",")(1)))

val joinedOiOc = orderComplete.join(revenueOI)

val revenueOiOc = joinedOiOc.map(record => (record._2._1, record._2._2))

val revenuebefAvg = revenueOiOc.aggregateByKey((0.0, 0))(
  (a, v) => (a._1 + v, a._2 + 1),
  (t1, t2) => (t1._1 + t2._1, t1._2 + t2._2))

val revenueAfterAvg = revenuebefAvg.map(record => (record._1, record._2._1 / record._2._2))

revenueAfterAvg.sortByKey().take(5).foreach(println)

sc.stop()

}
}


[infosnehasish@gw01 ~]$ spark-submit --class SparkExercise15 sparkexercise15_2.11-1.0.jar


(2013-07-25 00:00:00.0,606.979393939394)
(2013-07-26 00:00:00.0,585.6372222222222)
(2013-07-27 00:00:00.0,602.840181818182)
(2013-07-28 00:00:00.0,519.4790384615385)
(2013-07-29 00:00:00.0,611.982)
(2013-07-30 00:00:00.0,563.7529166666667)
(2013-07-31 00:00:00.0,683.8798529411764)
(2013-08-01 00:00:00.0,606.8477777777778)
(2013-08-02 00:00:00.0,546.76776119403)

0 Likes

#5

val orderItems = sc.textFile("/public/retail_db/order_items")
val oiMap = orderItems.map(records => (records.split(",")(1).toInt, records.split(",")(4).toDouble))
val revenueOI = oiMap.reduceByKey((a, b) => a + b)

val orders = sc.textFile("/public/retail_db/orders")
val orderComplete = orders.filter(records => records.split(",")(3) == “COMPLETE”)
val parsedorders=orderComplete.map(records => (records.split(",")(0).toInt, records.split(",")(1)))

val joinedOiOc = parsedorders.join(revenueOI)

val revenueOiOc = joinedOiOc.map(recordsord => (recordsord._2._1, recordsord._2._2))

val revenuebefAvg = revenueOiOc.aggregateByKey((0.0, 0))(
(a, v) => (a._1 + v, a._2 + 1),
(t1, t2) => (t1._1 + t2._1, t1._2 + t2._2))

val revenueAfterAvg = revenuebefAvg.map(recordsord => (recordsord._1, recordsord._2._1 / recordsord._2._2))

val sortedrevavg=revenueAfterAvg.sortByKey()
sortedrevavg.take(15).foreach(println)

(2013-07-25 00:00:00.0,606.979393939394)
(2013-07-26 00:00:00.0,585.6372222222221)
(2013-07-27 00:00:00.0,602.8401818181818)
(2013-07-28 00:00:00.0,519.4790384615385)
(2013-07-29 00:00:00.0,611.982)
(2013-07-30 00:00:00.0,563.7529166666668)
(2013-07-31 00:00:00.0,683.8798529411765)
(2013-08-01 00:00:00.0,606.8477777777778)
(2013-08-02 00:00:00.0,546.7677611940298)
(2013-08-03 00:00:00.0,621.94125)
(2013-08-04 00:00:00.0,556.6376595744681)
(2013-08-05 00:00:00.0,549.0282978723404)
(2013-08-06 00:00:00.0,559.6458108108108)
(2013-08-07 00:00:00.0,594.9641509433964)
(2013-08-08 00:00:00.0,594.7623913043478)

0 Likes

#6

val orders=sc.textFile(“file:///tmp/retail_db/orders/part-00000”)
val cmpl_orders=orders.filter(_.split(",")(3) == “COMPLETE”)
cmpl_orders.take(5).foreach(println)
val ordersMap = cmpl_orders.map(a => (a.split(",")(0).toInt,a.split(",")(1)))
ordersMap.take(5).foreach(println)

val order_item=sc.textFile(“file:///tmp/retail_db/order_items/part-00000”)
order_item.take(5).foreach(println)
val ordersItemMap = order_item.map(a => (a.split(",")(1).toInt,a.split(",")(4).toDouble))
ordersItemMap.take(5).foreach(println)
val redOitem = ordersItemMap.reduceByKey(_ + _)
redOitem.take(5).foreach(println)

val ordJoin=redOitem.join(ordersMap).map(_._2)
ordJoin.take(5).foreach(println)

val ordJoinmap = ordJoin.map(rec => (rec._2, rec._1)
ordJoinmap.take(5).foreach(println)

val agrFile= ordJoinmap.aggregateByKey((0.0, 0))((a,v)=>(a._1+v , a._2+1), (t1, t2) => (t1._1+t2._1, t1._2+t2._2))
val avgrev = agrFile.map(rec => (rec._1, rec._2._1 / rec._2._2))
ordJoinmap.take(10).foreach(println)

scala> avgrev.take(10).foreach(println)

(2013-10-05 00:00:00.0,597.1444642857144)
(2014-07-17 00:00:00.0,516.4504444444444)
(2014-05-06 00:00:00.0,560.3745945945946)
(2014-05-17 00:00:00.0,561.5159615384614)
(2014-04-25 00:00:00.0,614.5466666666667)
(2013-08-11 00:00:00.0,519.5778048780487)
(2013-12-10 00:00:00.0,589.0835714285714)
(2014-03-04 00:00:00.0,616.2697435897435)
(2013-10-29 00:00:00.0,636.3313513513514)
(2013-08-08 00:00:00.0,594.7623913043478)

0 Likes

#7

Code in Spark-Shell

val loadFile=sc.textFile("file:///tmp/retail_db/orders/part-00000")
val filterLoadFile = loadFile.filter(rec => rec.split(",")(3).equals("COMPLETE"))
val reqFields = filterLoadFile.map(rec=> (rec.split(",")(0).toInt,rec.split(",")(1)))

val loadOrderItems=sc.textFile("file:///tmp/retail_db/order_items/part-00000")
val loadOrderItemMap=loadOrderItems.map(rec=> (rec.split(",")(1).toInt, rec.split(",")(4).toDouble))
val reduceOrderItemMap=loadOrderItemMap.reduceByKey((acc, value)=> acc + value)

val joinsFiles = reduceOrderItemMap.join(reqFields)
val mapAgain = joinsFiles.map(rec=> (rec._2._2, rec._2._1))


val agrFile= mapAgain.aggregateByKey((0.0, 0))((a,v)=>(a._1+v , a._2+1), (t1, t2) => (t1._1+t2._1, t1._2+t2._2))

val avgRev = agrFile.map(rec => (rec._1, rec._2._1 / rec._2._2))
avgRev.take(10).foreach(println)

(2013-10-05 00:00:00.0,597.1444642857144)
(2014-07-17 00:00:00.0,516.4504444444444)
(2014-05-06 00:00:00.0,560.3745945945946)
(2014-05-17 00:00:00.0,561.5159615384614)
(2014-04-25 00:00:00.0,614.5466666666667)
(2013-08-11 00:00:00.0,519.5778048780487)
(2013-12-10 00:00:00.0,589.0835714285714)
(2014-03-04 00:00:00.0,616.2697435897435)
(2013-10-29 00:00:00.0,636.3313513513514)
(2013-08-08 00:00:00.0,594.7623913043478)
0 Likes

#8
val orders = sc.textFile("/public/retail_db/orders")

val orderItems = sc.textFile("/public/retail_db/order_items")

  1. val ordercom =orders.filter(_.split(",")(3).equals(“COMPLETE”))

val ordercom_join = ordercom.map(line=>{
| val lines=line.split(’,’)
| (lines(0).toInt,lines(1).toString)})

  1. map order_items–>(order_id,order_item_subtotal)
    val order_items_join =orderItems.map({line=>
    | val lines=line.split(’,’)
    | (lines(1).toInt,lines(4).toDouble)})

5.reducekey
order_items_join.reduceByKey((x,y)=>(x+y)).take(5).foreach(println)

val reduceMapOrders = order_items_join.reduceByKey((x,y)=>(x+y))

  1. reduceMapOrders.join(ordercom_join).take(5).foreach(println)
    val joinrdd = reduceMapOrders.join(ordercom_join)
    (65722,(1319.89,2014-05-23 00:00:00.0))
    (23776,(329.98,2013-12-20 00:00:00.0))
    (53926,(219.97,2014-06-30 00:00:00.0))
    (55194,(759.94,2014-07-09 00:00:00.0))
    (30114,(329.98,2014-01-28 00:00:00.0))

7.val reverdd = joinrdd.map(rec=>(rec._2._1, rec._2._2))
(1319.89,2014-05-23 00:00:00.0)
(329.98,2013-12-20 00:00:00.0)
(219.97,2014-06-30 00:00:00.0)
(759.94,2014-07-09 00:00:00.0)
(329.98,2014-01-28 00:00:00.0)

8.aggregate
val aggRdd = reverdd.aggregateByKey((0.0,0))(
(a,v)=>(a._1+v, a._2+1),
(v1,v2)=> (v1._1+v2._1 , v1._2 +v2._2)
)

9.val avgRevenue= aggRdd.map(rec=>(rec._1,(rec._2._1/rec._2._2)))
10.avgRevenue.sortByKey().take(5).foreach(println)
(2013-07-25 00:00:00.0,606.979393939394)
(2013-07-26 00:00:00.0,585.6372222222221)
(2013-07-27 00:00:00.0,602.8401818181818)
(2013-07-28 00:00:00.0,519.4790384615385)
(2013-07-29 00:00:00.0,611.982)

0 Likes

#9

scala> val orders = sc.textFile("/public/retail_db/orders")

scala> val orderItems = sc.textFile("/public/retail_db/order_items")

scala> val compl_orders = orders.filter(_.split(",")(3)equals(“COMPLETE”))

scala> val map_compl_orders = compl_orders.map(rec => (rec.split(",")(0).toInt,rec.split(",")(1).toString))

scala> val map_order_items = orderItems.map(rec => (rec.split(",")(1).toInt,rec.split(",")(4).toDouble))

scala> val reduced_map_order_items = map_order_items.reduceByKey((x , y) => x + y)

scala> val joinRDD = map_compl_orders.join(reduced_map_order_items)

scala> val MapJoinRDD = joinRDD.map(rec => (rec._2))

scala> val beforeAvgRevenue = MapJoinRDD.aggregateByKey((0.0, 0))(
| (a, v) => (a._1 + v, a._2 + 1),
| (t1, t2) => (t1._1 + t2._1, t1._2 + t2._2))

scala> beforeAvgRevenue.take(5).foreach(println)

scala> val avgRevenue= beforeAvgRevenue.map(rec=>(rec._1,(rec._2._1/rec._2._2)))

scala> avgRevenue.sortByKey().take(5).foreach(println)

(2013-07-25 00:00:00.0,606.979393939394)
(2013-07-26 00:00:00.0,585.6372222222221)
(2013-07-27 00:00:00.0,602.8401818181818)
(2013-07-28 00:00:00.0,519.4790384615385)
(2013-07-29 00:00:00.0,611.982)

0 Likes

#10

#Complete spark based scala code

package training

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.log4j.Level
import org.apache.log4j.Logger

object FromTraining {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[4]").setAppName("Spark word count")
    val sc = new SparkContext(conf)

    val orders = sc.textFile("hdfs://quickstart:8020/user/analytics/retail_db/orders")
    val orderItems = sc.textFile("hdfs://quickstart:8020/user/analytics/retail_db/order_items")

    val completedOrders = orders.filter(_.split(",")(3).contains("COMPLETE"))

    val orderItemsToFilter = orderItems.flatMap { line =>
      line.split(",") match {
        case Array(id, order_id, productId, quantity, subtotal, product_price) => Some(order_id.toInt, subtotal.toDouble)
        case _ => None
      }
    }.reduceByKey(_ + _)

    val ordersToFilter = completedOrders.map(x => (x.split(",")(0).toInt, (x.split(",")(1).toString)))

    val joinedOrders = orderItemsToFilter.join(ordersToFilter)

    val afterJoin = joinedOrders.flatMap {
      case (orderId, (subTotal, orderDate)) => Some(orderDate, subTotal)
    }

    val aggregatedResult = afterJoin.aggregateByKey((0.0, 0))((a, v) => (a._1 + v, a._2 + 1), (t1, t2) => (t1._1 + t2._1, t1._2 + t2._2))

    val averageResultsForEachDate = aggregatedResult.map {
      case (date, (totalrevenue, totalDays)) =>
        (date, totalrevenue / totalDays)
    }

    averageResultsForEachDate.sortByKey(ascending = false).take(5).foreach(println)

  }
}

#Get first 5 records (use sortByKey before using take)

(2014-07-24 00:00:00.0,628.2187272727272)
(2014-07-23 00:00:00.0,637.06275)
(2014-07-22 00:00:00.0,600.71125)
(2014-07-21 00:00:00.0,621.0714754098361)
0 Likes