# Exercise 15 - Spark - Compute average revenue per day for all completed orders

Problem Statement:

• Compute average revenue per day
• Get completed orders
• Join orders and order_items and get required fields to compute average revenue
• Divide the revenue per day with number of unique orders per day to get average revenue
• Hint: You need to use combination of map, reduceByKey/aggregateByKey, join to get the final result

• Provide high level design highlighting all the steps you are going to perform
• Complete spark based scala code
• Get first 5 records (use sortByKey before using take)

val orders = sc.textFile("/user/nagellarajashyam/sqoop_import/orders")

val ordersRDD = orders.map(rec=>(rec.split(",")(0),rec))
//Split the data to (orderId,rec)

val filteredOrdersRDD = ordersRDD.filter(rec=>rec._2.split(",")(3)==“COMPLETE”)
//filter the data which has only complted

val orderItems=sc.textFile("/user/nagellarajashyam/sqoop_import/order_items")

val orderItemsRDD= orderItems.map(rec=>(rec.split(",")(1),rec))
//split the data to (orderItemOrderId,rec)

val joinRDD = filteredOrdersRDD.join(orderItemsRDD)

//JoinRDD will contain (orderId,(OrdersTuple,orderItemsTuple))

val joinParsedRDD = joinRDD.map(rec=>(rec._1,(rec._2._1.split(",")(1),rec._2._2.split(",")(4).toFloat)))

//joinParsedRDD will contain (orderId,(date,revenue))

val UniqueJoinParsedRDD= joinParsedRDD.map(rec=>((rec._1,rec._2._1),rec.2.2)).reduceByKey(+)

val UniqueJoinParsedRDDMap= UniqueJoinParsedRDD.map(rec=>(rec._1._1,(rec._1._2,rec._2.toFloat)))

val revenueMap = UniqueJoinParsedRDDMap.map(rec=>(rec._2._1,1))

//for identyfying the no:of orders generate a tuple with (date,1)

val ordersPerDay = revenueMap.reduceByKey(+)

//This ordersPerDay will contain the tuple of(date,no:of orders)

val revenuePerDay = UniqueJoinParsedRDDMap.map(rec=>(rec._2._1,rec._2._2))

//revenuePerDay will contain the revenue perday as (date,revenue) Here there may be duplicate records

val aggRevenuePerDay = revenuePerDay.reduceByKey(+)

// aggRevenuePerDay will contain the revenue perday as (date,aggRevenue)

val totalRevenuePerDay = ordersPerDay.join(aggRevenuePerDay)

//This will give (date,(ordersPerDay,revenue))

val totalAggRevenuePerDay = totalRevenuePerDay.map(rec=>(rec._1,rec._2._2/rec._2._1))

//This final RDD will contain (date,aggRevenue)

totalAggRevenuePerDay.sortByKey().take(10).foreach(println)

//Sorted data

(2013-07-25 00:00:00.0,606.97943)
(2013-07-26 00:00:00.0,585.63745)
(2013-07-27 00:00:00.0,602.84033)
(2013-07-28 00:00:00.0,519.4791)
(2013-07-29 00:00:00.0,611.9819)
(2013-07-30 00:00:00.0,563.7529)
(2013-07-31 00:00:00.0,683.8797)
(2013-08-01 00:00:00.0,606.8479)
(2013-08-02 00:00:00.0,546.7679)
(2013-08-03 00:00:00.0,621.9413)

//using aggregateByKey

val joinParsedRDDMap = joinParsedRDD.map(rec=>(rec._2._1,rec._2._2))

val revenuePerDayWithOrders = joinParsedRDDMap.aggregateByKey((0.0,0))((acc,val)=>(acc._1+val,acc._2+1),(t1,t2)=>(t1._1+t2._1,t1._2+t2._2))

val orderItemsMap = orderItems.map(rec=>(rec.split(",")(1).toInt ,
rec.split(",")(4).toDouble)
)
val redOrderItemsMap= orderItemsMap.reduceByKey(+)
redOrderItemsMap: org.apache.spark.rdd.RDD[(Int, Double)] = ShuffledRDD[10] at reduceByKey at :31

val completeOrders= orders.filter((rec=>rec.split(",")(3)==“COMPLETE”))
completeOrders: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at filter at :29

val completeOrdersMap= completeOrders.map(rec=>(rec.split(",")(0).toInt,
rec.split(",")(1)
))
scala> val joinRdd= completeOrdersMap.join(redOrderItemsMap)

val revenuerdd= joinRdd.map(rec=>(rec._2._1, rec._2._2))

val aggRdd = revenuerdd.aggregateByKey((0.0,0))(
(a,v)=>(a._1+v, a._2+1),
(v1,v2)=> (v1._1+v2._1 , v1._2 +v2._2)
)

val avgRevenue= aggRdd.map(rec=>(rec._1,(rec._2._1/rec._2._2)))

avgRevenue.sortByKey().take(5).foreach(println)

(2013-07-25 00:00:00.0,606.979393939394)
(2013-07-26 00:00:00.0,585.6372222222221)
(2013-07-27 00:00:00.0,602.8401818181818)
(2013-07-28 00:00:00.0,519.4790384615385)
(2013-07-29 00:00:00.0,611.982)

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SparkExercise15 {
def main(args: Array[String]) {

``````val conf = new SparkConf().setAppName("Sneh Exercise 15").setMaster("local")
val sc = new SparkContext(conf)

val orderItems = sc.textFile("/public/retail_db/order_items")
val oiMap = orderItems.map(rec => (rec.split(",")(1).toInt, rec.split(",")(4).toDouble))
val revenueOI = oiMap.reduceByKey((a, b) => a + b)

val orders = sc.textFile("/public/retail_db/orders")
val orderComplete = orders.filter(rec => rec.split(",")(3) == "COMPLETE").map(rec => (rec.split(",")(0).toInt, rec.split(",")(1)))

val joinedOiOc = orderComplete.join(revenueOI)

val revenueOiOc = joinedOiOc.map(record => (record._2._1, record._2._2))

val revenuebefAvg = revenueOiOc.aggregateByKey((0.0, 0))(
(a, v) => (a._1 + v, a._2 + 1),
(t1, t2) => (t1._1 + t2._1, t1._2 + t2._2))

val revenueAfterAvg = revenuebefAvg.map(record => (record._1, record._2._1 / record._2._2))

revenueAfterAvg.sortByKey().take(5).foreach(println)

sc.stop()
``````

}
}

[infosnehasish@gw01 ~]\$ spark-submit --class SparkExercise15 sparkexercise15_2.11-1.0.jar

(2013-07-25 00:00:00.0,606.979393939394)
(2013-07-26 00:00:00.0,585.6372222222222)
(2013-07-27 00:00:00.0,602.840181818182)
(2013-07-28 00:00:00.0,519.4790384615385)
(2013-07-29 00:00:00.0,611.982)
(2013-07-30 00:00:00.0,563.7529166666667)
(2013-07-31 00:00:00.0,683.8798529411764)
(2013-08-01 00:00:00.0,606.8477777777778)
(2013-08-02 00:00:00.0,546.76776119403)

val orderItems = sc.textFile("/public/retail_db/order_items")
val oiMap = orderItems.map(records => (records.split(",")(1).toInt, records.split(",")(4).toDouble))
val revenueOI = oiMap.reduceByKey((a, b) => a + b)

val orders = sc.textFile("/public/retail_db/orders")
val orderComplete = orders.filter(records => records.split(",")(3) == “COMPLETE”)
val parsedorders=orderComplete.map(records => (records.split(",")(0).toInt, records.split(",")(1)))

val joinedOiOc = parsedorders.join(revenueOI)

val revenueOiOc = joinedOiOc.map(recordsord => (recordsord._2._1, recordsord._2._2))

val revenuebefAvg = revenueOiOc.aggregateByKey((0.0, 0))(
(a, v) => (a._1 + v, a._2 + 1),
(t1, t2) => (t1._1 + t2._1, t1._2 + t2._2))

val revenueAfterAvg = revenuebefAvg.map(recordsord => (recordsord._1, recordsord._2._1 / recordsord._2._2))

val sortedrevavg=revenueAfterAvg.sortByKey()
sortedrevavg.take(15).foreach(println)

(2013-07-25 00:00:00.0,606.979393939394)
(2013-07-26 00:00:00.0,585.6372222222221)
(2013-07-27 00:00:00.0,602.8401818181818)
(2013-07-28 00:00:00.0,519.4790384615385)
(2013-07-29 00:00:00.0,611.982)
(2013-07-30 00:00:00.0,563.7529166666668)
(2013-07-31 00:00:00.0,683.8798529411765)
(2013-08-01 00:00:00.0,606.8477777777778)
(2013-08-02 00:00:00.0,546.7677611940298)
(2013-08-03 00:00:00.0,621.94125)
(2013-08-04 00:00:00.0,556.6376595744681)
(2013-08-05 00:00:00.0,549.0282978723404)
(2013-08-06 00:00:00.0,559.6458108108108)
(2013-08-07 00:00:00.0,594.9641509433964)
(2013-08-08 00:00:00.0,594.7623913043478)

val orders=sc.textFile(“file:///tmp/retail_db/orders/part-00000”)
val cmpl_orders=orders.filter(_.split(",")(3) == “COMPLETE”)
cmpl_orders.take(5).foreach(println)
val ordersMap = cmpl_orders.map(a => (a.split(",")(0).toInt,a.split(",")(1)))
ordersMap.take(5).foreach(println)

val order_item=sc.textFile(“file:///tmp/retail_db/order_items/part-00000”)
order_item.take(5).foreach(println)
val ordersItemMap = order_item.map(a => (a.split(",")(1).toInt,a.split(",")(4).toDouble))
ordersItemMap.take(5).foreach(println)
val redOitem = ordersItemMap.reduceByKey(_ + _)
redOitem.take(5).foreach(println)

val ordJoin=redOitem.join(ordersMap).map(_._2)
ordJoin.take(5).foreach(println)

val ordJoinmap = ordJoin.map(rec => (rec._2, rec._1)
ordJoinmap.take(5).foreach(println)

val agrFile= ordJoinmap.aggregateByKey((0.0, 0))((a,v)=>(a._1+v , a._2+1), (t1, t2) => (t1._1+t2._1, t1._2+t2._2))
val avgrev = agrFile.map(rec => (rec._1, rec._2._1 / rec._2._2))
ordJoinmap.take(10).foreach(println)

scala> avgrev.take(10).foreach(println)

(2013-10-05 00:00:00.0,597.1444642857144)
(2014-07-17 00:00:00.0,516.4504444444444)
(2014-05-06 00:00:00.0,560.3745945945946)
(2014-05-17 00:00:00.0,561.5159615384614)
(2014-04-25 00:00:00.0,614.5466666666667)
(2013-08-11 00:00:00.0,519.5778048780487)
(2013-12-10 00:00:00.0,589.0835714285714)
(2014-03-04 00:00:00.0,616.2697435897435)
(2013-10-29 00:00:00.0,636.3313513513514)
(2013-08-08 00:00:00.0,594.7623913043478)

Code in Spark-Shell

``````val loadFile=sc.textFile("file:///tmp/retail_db/orders/part-00000")

val reduceOrderItemMap=loadOrderItemMap.reduceByKey((acc, value)=> acc + value)

val joinsFiles = reduceOrderItemMap.join(reqFields)
val mapAgain = joinsFiles.map(rec=> (rec._2._2, rec._2._1))

val agrFile= mapAgain.aggregateByKey((0.0, 0))((a,v)=>(a._1+v , a._2+1), (t1, t2) => (t1._1+t2._1, t1._2+t2._2))

val avgRev = agrFile.map(rec => (rec._1, rec._2._1 / rec._2._2))
avgRev.take(10).foreach(println)

(2013-10-05 00:00:00.0,597.1444642857144)
(2014-07-17 00:00:00.0,516.4504444444444)
(2014-05-06 00:00:00.0,560.3745945945946)
(2014-05-17 00:00:00.0,561.5159615384614)
(2014-04-25 00:00:00.0,614.5466666666667)
(2013-08-11 00:00:00.0,519.5778048780487)
(2013-12-10 00:00:00.0,589.0835714285714)
(2014-03-04 00:00:00.0,616.2697435897435)
(2013-10-29 00:00:00.0,636.3313513513514)
(2013-08-08 00:00:00.0,594.7623913043478)``````
``````val orders = sc.textFile("/public/retail_db/orders")
``````

val orderItems = sc.textFile("/public/retail_db/order_items")

1. val ordercom =orders.filter(_.split(",")(3).equals(“COMPLETE”))

val ordercom_join = ordercom.map(line=>{
| val lines=line.split(’,’)
| (lines(0).toInt,lines(1).toString)})

1. map order_items–>(order_id,order_item_subtotal)
val order_items_join =orderItems.map({line=>
| val lines=line.split(’,’)
| (lines(1).toInt,lines(4).toDouble)})

5.reducekey
order_items_join.reduceByKey((x,y)=>(x+y)).take(5).foreach(println)

val reduceMapOrders = order_items_join.reduceByKey((x,y)=>(x+y))

1. reduceMapOrders.join(ordercom_join).take(5).foreach(println)
val joinrdd = reduceMapOrders.join(ordercom_join)
(65722,(1319.89,2014-05-23 00:00:00.0))
(23776,(329.98,2013-12-20 00:00:00.0))
(53926,(219.97,2014-06-30 00:00:00.0))
(55194,(759.94,2014-07-09 00:00:00.0))
(30114,(329.98,2014-01-28 00:00:00.0))

7.val reverdd = joinrdd.map(rec=>(rec._2._1, rec._2._2))
(1319.89,2014-05-23 00:00:00.0)
(329.98,2013-12-20 00:00:00.0)
(219.97,2014-06-30 00:00:00.0)
(759.94,2014-07-09 00:00:00.0)
(329.98,2014-01-28 00:00:00.0)

8.aggregate
val aggRdd = reverdd.aggregateByKey((0.0,0))(
(a,v)=>(a._1+v, a._2+1),
(v1,v2)=> (v1._1+v2._1 , v1._2 +v2._2)
)

9.val avgRevenue= aggRdd.map(rec=>(rec._1,(rec._2._1/rec._2._2)))
10.avgRevenue.sortByKey().take(5).foreach(println)
(2013-07-25 00:00:00.0,606.979393939394)
(2013-07-26 00:00:00.0,585.6372222222221)
(2013-07-27 00:00:00.0,602.8401818181818)
(2013-07-28 00:00:00.0,519.4790384615385)
(2013-07-29 00:00:00.0,611.982)

scala> val orders = sc.textFile("/public/retail_db/orders")

scala> val orderItems = sc.textFile("/public/retail_db/order_items")

scala> val compl_orders = orders.filter(_.split(",")(3)equals(“COMPLETE”))

scala> val map_compl_orders = compl_orders.map(rec => (rec.split(",")(0).toInt,rec.split(",")(1).toString))

scala> val map_order_items = orderItems.map(rec => (rec.split(",")(1).toInt,rec.split(",")(4).toDouble))

scala> val reduced_map_order_items = map_order_items.reduceByKey((x , y) => x + y)

scala> val joinRDD = map_compl_orders.join(reduced_map_order_items)

scala> val MapJoinRDD = joinRDD.map(rec => (rec._2))

scala> val beforeAvgRevenue = MapJoinRDD.aggregateByKey((0.0, 0))(
| (a, v) => (a._1 + v, a._2 + 1),
| (t1, t2) => (t1._1 + t2._1, t1._2 + t2._2))

scala> beforeAvgRevenue.take(5).foreach(println)

scala> val avgRevenue= beforeAvgRevenue.map(rec=>(rec._1,(rec._2._1/rec._2._2)))

scala> avgRevenue.sortByKey().take(5).foreach(println)

(2013-07-25 00:00:00.0,606.979393939394)
(2013-07-26 00:00:00.0,585.6372222222221)
(2013-07-27 00:00:00.0,602.8401818181818)
(2013-07-28 00:00:00.0,519.4790384615385)
(2013-07-29 00:00:00.0,611.982)

#Complete spark based scala code

``````package training

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.log4j.Level
import org.apache.log4j.Logger

object FromTraining {
def main(args: Array[String]): Unit = {

val conf = new SparkConf().setMaster("local[4]").setAppName("Spark word count")
val sc = new SparkContext(conf)

val orders = sc.textFile("hdfs://quickstart:8020/user/analytics/retail_db/orders")
val orderItems = sc.textFile("hdfs://quickstart:8020/user/analytics/retail_db/order_items")

val completedOrders = orders.filter(_.split(",")(3).contains("COMPLETE"))

val orderItemsToFilter = orderItems.flatMap { line =>
line.split(",") match {
case Array(id, order_id, productId, quantity, subtotal, product_price) => Some(order_id.toInt, subtotal.toDouble)
case _ => None
}
}.reduceByKey(_ + _)

val ordersToFilter = completedOrders.map(x => (x.split(",")(0).toInt, (x.split(",")(1).toString)))

val joinedOrders = orderItemsToFilter.join(ordersToFilter)

val afterJoin = joinedOrders.flatMap {
case (orderId, (subTotal, orderDate)) => Some(orderDate, subTotal)
}

val aggregatedResult = afterJoin.aggregateByKey((0.0, 0))((a, v) => (a._1 + v, a._2 + 1), (t1, t2) => (t1._1 + t2._1, t1._2 + t2._2))

val averageResultsForEachDate = aggregatedResult.map {
case (date, (totalrevenue, totalDays)) =>
(date, totalrevenue / totalDays)
}

averageResultsForEachDate.sortByKey(ascending = false).take(5).foreach(println)

}
}
``````

#Get first 5 records (use sortByKey before using take)

``````(2014-07-24 00:00:00.0,628.2187272727272)
(2014-07-23 00:00:00.0,637.06275)
(2014-07-22 00:00:00.0,600.71125)
(2014-07-21 00:00:00.0,621.0714754098361)``````