Exercise 18 - Accumulators and Broadcast Variables

eclipse
apache-spark
scala
#1

Problem Statement

  • Compute total revenue per day per department (Need to join 5 tables)
  • Join departments, categories and products - create hash map and broadcast
    val bv = sc.broadcast(productDepartmentsMap.collectAsMap())
  • Lookup into broadcast variable to get department while processing order_items to get order_id, department_name and order_item_subtotal
  • While getting completed orders, increment accumulator which give details about count of completed orders
  • Finally join both the data sets and get total revenue for each day and department

Please provide the following as output

  • Complete program
  • Sample data
0 Likes

#2

import com.typesafe.config._
import org.apache.hadoop.fs._
import org.apache.spark.{SparkConf, SparkContext}
object AvgRevenueDailyAccumulators {
def main(args: Array[String]) {
val appConf = ConfigFactory.load()
val inputBaseDir = args(0)
val conf = new SparkConf().
setAppName(“Total Revenue per department - Daily”).
setMaster(appConf.getConfig(args(2)).getString(“deploymentMaster”))
val sc = new SparkContext(conf)

val inputPath = args(0)
val outputPath = args(1)

val fs = FileSystem.get(sc.hadoopConfiguration)
val inputPathExists = fs.exists(new Path(inputPath))
val outputPathExists = fs.exists(new Path(outputPath))

if (!inputPathExists) {
  println("Input Path does not exists")
  return
}

if (outputPathExists) {
  fs.delete(new Path(outputPath), true)
}

val departments = sc.textFile( "/public/retail_db/departments")
val categories = sc.textFile("/public/retail_db/categories")
val products = sc.textFile("/public/retail_db/products")

val departmentsMap = departments.
  map(rec => (rec.split(",")(0).toInt, rec.split(",")(1)))
val categoriesMap = categories.
  map(rec => (rec.split(",")(0).toInt, rec.split(",")(1).toInt))
val productsMap = products.
  map(rec => (rec.split(",")(1).toInt, rec.split(",")(0).toInt))

val productCategories = productsMap.join(categoriesMap)
val productCategoriesMap = productCategories.
  map(rec => (rec._2._2, rec._2._1))
val productDepartments = productCategoriesMap.join(departmentsMap)
val productDepartmentsMap = productDepartments.
  map(rec => (rec._2._1, rec._2._2)).
  distinct

val bv = sc.broadcast(productDepartmentsMap.collectAsMap())

val orders = sc.textFile("/public/retail_db/orders")
val orderItems = sc.textFile("/public/retail_db/order_items")

val ordersCompletedAccum = sc.accumulator(0, "ordersCompleted count")
val ordersFilterInvokedAccum = sc.accumulator(0, "orders filter invoked count")
val ordersCompleted = orders.filter(rec => {
    ordersFilterInvokedAccum += 1
    if (rec.split(",")(3) == "COMPLETE") {
      ordersCompletedAccum += 1
    }
   
  }).map(rec => (rec.split(",")(0).toInt, rec.split(",")(1))
  )
val orderItemsMap = orderItems.map(rec => (rec.split(",")(1).toInt, (bv.value(rec.split(",")(2).toInt),
      rec.split(",")(4).toFloat)))

val ordersJoin = ordersCompleted.join(orderItemsMap)
val revenuePerDayPerDepartment = ordersJoin.
  map(rec => ((rec._2._1, rec._2._2._1), rec._2._2._2)).
  reduceByKey((acc, value) => acc + value)
revenuePerDayPerDepartment.sortByKey().saveAsTextFile("/user/sumanthsharma21/bd_acc/")

}
}

output:
((2013-07-25 00:00:00.0,Apparel),3279.5698)
((2013-07-25 00:00:00.0,Fan Shop),9798.689)
((2013-07-25 00:00:00.0,Fitness),394.93)
((2013-07-25 00:00:00.0,Footwear),3899.61)
((2013-07-25 00:00:00.0,Golf),2029.72)
((2013-07-25 00:00:00.0,Outdoors),627.80005)
((2013-07-26 00:00:00.0,Apparel),8828.75)
((2013-07-26 00:00:00.0,Fan Shop),20847.686)
((2013-07-26 00:00:00.0,Fitness),183.98001)
((2013-07-26 00:00:00.0,Footwear),5129.42)

0 Likes

#3

package org.test.spark

import com.typesafe.config.ConfigFactory
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

object SparkEx18 {
def main(args: Array[String]): Unit = {
val appConf=ConfigFactory.load()
val conf=new SparkConf().setAppName(“spark exercise 17”)
.setMaster(appConf.getConfig(args(2)).getString(“deployment”))
val sc=new SparkContext(conf)
val inputPath = args(0)
val outputPath = args(1)
val fs = FileSystem.get(sc.hadoopConfiguration)
val inputPathExists = fs.exists(new Path(inputPath))
val outputPathExists = fs.exists(new Path(outputPath))
if (!inputPathExists) {
println(“Input Path does not exists”)
return
}

if (outputPathExists) {
  fs.delete(new Path(outputPath), true)
}
 val orders = sc.textFile(inputPath + "/orders")
   val orderItems = sc.textFile(inputPath + "/order_items")
    val products = sc.textFile(inputPath + "/products")
     val categories= sc.textFile(inputPath + "/categories")
    val departments= sc.textFile(inputPath + "/departments")
   val departmentsMap = departments.
  map(rec => (rec.split(",")(0).toInt, rec.split(",")(1)))
val categoriesMap = categories.
  map(rec => (rec.split(",")(0).toInt, rec.split(",")(1).toInt))
val productsMap = products.
  map(rec => (rec.split(",")(1).toInt, rec.split(",")(0).toInt))
  
  val productCategories = productsMap.join(categoriesMap)
  
   val productCategoriesMap = productCategories.
  map(rec => (rec._2._2, rec._2._1))
  
   val productDepartments = productCategoriesMap.join(departmentsMap)
   
    val productDepartmentsMap = productDepartments.
  map(rec => (rec._2._1, rec._2._2)).
  distinct

   val bv = sc.broadcast(productDepartmentsMap.collectAsMap())

   val ordersCompleted = orders.
  filter(rec => (rec.split(",")(3) == "COMPLETE")).
  map(rec => (rec.split(",")(0).toInt, rec.split(",")(1)))
  
  val orderItemsMap = orderItems.
  map(rec =>
    (rec.split(",")(1).toInt, (bv.value.get(rec.split(",")(2).toInt).get,
      rec.split(",")(4).toFloat)))
      
       val ordersJoin = ordersCompleted.join(orderItemsMap)
val revenuePerDayPerDepartment = ordersJoin.
  map(rec => ((rec._2._1, rec._2._2._1), rec._2._2._2)).
  reduceByKey((acc, value) => acc + value)
  
  revenuePerDayPerDepartment.sortByKey().saveAsTextFile(outputPath)

}
}

hadoop fs -tail /user/paramesh/outputex18/part-00000
97)
((2014-01-14 00:00:00.0,Footwear),5007.26)
((2014-01-14 00:00:00.0,Golf),4599.83)
((2014-01-14 00:00:00.0,Outdoors),1201.71)
((2014-01-15 00:00:00.0,Apparel),9448.74)
((2014-01-15 00:00:00.0,Fan Shop),16327.424)
((2014-01-15 00:00:00.0,Fitness),877.89)
((2014-01-15 00:00:00.0,Footwear),6374.2397)
((2014-01-15 00:00:00.0,Golf),3729.7698)
((2014-01-15 00:00:00.0,Outdoors),585.77)
((2014-01-16 00:00:00.0,Apparel),5699.26)
((2014-01-16 00:00:00.0,Fan Shop),15538.3125)
((2014-01-16 00:00:00.0,Fitness),534.9)
((2014-01-16 00:00:00.0,Footwear),3315.66)
((2014-01-16 00:00:00.0,Golf),3819.8198)
((2014-01-16 00:00:00.0,Outdoors),591.9)
((2014-01-17 00:00:00.0,Apparel),5069.3296)
((2014-01-17 00:00:00.0,Fan Shop),11498.59)
((2014-01-17 00:00:00.0,Footwear),2155.7)
((2014-01-17 00:00:00.0,Golf),3619.7202)
((2014-01-17 00:00:00.0,Outdoors),807.81)
((2014-01-18 00:00:00.0,Apparel),4569.3896)
((2014-01-18 00:00:00.0,Fan Shop),11998.581)
((2014-01-18 00:00:00.0,Fitness),399.96)
((2014-01-18 00:00:00.0,Footwear),2644.73)

0 Likes

#4

package com.scala.avgrvn

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object BroadCast {
def main(args: Array[String]) {

val conf = new SparkConf()
val sc = new SparkContext(conf)

val departments = sc.textFile("/user/saswat232/retail_db/departments")
val categories = sc.textFile("/user/saswat232/retail_db/categories")
val products = sc.textFile("/user/saswat232/retail_db/products")

val deptMap = departments.map( rec => (rec.split(",")(0), rec) )
val catMap = categories.map( rec => (rec.split(",")(1), rec) )

val DeptCatJoin = deptMap.join(catMap)

val JoinMap = DeptCatJoin.map(rec => (rec._2._2.split(",")(0), rec._2._1.split(",")(1)))
val productsMap = products.map( rec => (rec.split(",")(1), rec.split(",")(0)) )

val BroadJoin = JoinMap.join(productsMap)

val BroadMap = BroadJoin.map(rec => (rec._2._2, rec._2._1))

val BrCsVr = sc.broadcast(BroadMap.collectAsMap())

val order_items = sc.textFile("/user/saswat232/retail_db/order_items")
val orders = sc.textFile("/user/saswat232/retail_db/orders")

val OrItMap = order_items.map(rec => (rec.split(",")(1),(BrCsVr.value.get(rec.split(",")(2)).get,rec.split(",")(4))))
val orderscmpl = orders.filter(rec => (rec.split(",")(3) == “COMPLETE”))
val ordersMap = orderscmpl.map(rec => (rec.split(",")(0), rec.split(",")(1)))

val orItJoin = ordersMap.join(OrItMap).map(rec => ((rec._2._1, rec._2._2._1), rec._2._2._2.toFloat))
val revperdpt = orItJoin.reduceByKey((a, b) => a + b)

revperdpt.saveAsTextFile("/user/saswat232/retail_db/perdeptdaterev")
}
}

Output Data ::

((2013-12-01 00:00:00.0,Golf),5539.76)
((2014-06-23 00:00:00.0,Golf),2414.8901)
((2013-11-18 00:00:00.0,Apparel),6449.17)
((2013-11-18 00:00:00.0,Outdoors),415.83997)
((2013-12-16 00:00:00.0,Fan Shop),4199.59)
((2013-11-08 00:00:00.0,Footwear),2763.6199)
((2013-11-25 00:00:00.0,Outdoors),706.85004)
((2013-08-22 00:00:00.0,Outdoors),922.79)
((2013-09-08 00:00:00.0,Apparel),6759.0596)
((2013-08-20 00:00:00.0,Fitness),209.97)
((2013-11-20 00:00:00.0,Fan Shop),14847.9)
((2014-07-20 00:00:00.0,Outdoors),984.67993)
((2013-08-23 00:00:00.0,Apparel),5439.28)
((2013-12-18 00:00:00.0,Footwear),2878.7)
((2014-01-21 00:00:00.0,Fitness),201.96)
((2013-10-11 00:00:00.0,Outdoors),1181.72)
((2013-12-19 00:00:00.0,Fitness),149.96)
((2014-06-03 00:00:00.0,Footwear),4659.54)
((2013-11-18 00:00:00.0,Fitness),22.0)
((2014-04-20 00:00:00.0,Outdoors),1227.73)
((2013-11-03 00:00:00.0,Footwear),6389.37)

0 Likes

#5
val departments=sc.textFile("/public/retail_db/departments")

val categories=sc.textFile("/public/retail_db/categories")

val products=sc.textFile("/public/retail_db/products")


val deptMap=departments.map(rec=>(rec.split(",")(0),rec.split(",")(1)))

val categoryMap=categories.map(rec=>(rec.split(",")(1),rec.split(",")(0)))

val categoryJoinDept=categoryMap.join(deptMap).distinct.map(rec=>rec._2)

//the output will be (categoryId,deptName)

val productMap=products.map(rec=>(rec.split(",")(1),rec.split(",")(0)))

//(product_category_id,product_id)

val productJoinCategory_dept=productMap.join(categoryJoinDept)

//count:1081
//(category_id,(productId,deptName))


val broadcastRDD=productJoinCategory_dept.map(rec=>rec._2)

//(productId,deptName)

val bv = sc.broadcast(broadcastRDD.collectAsMap())

//created a broadcast variable

val orders=sc.textFile("/public/retail_db/orders")

val order_items=sc.textFile("/public/retail_db/order_items")

val completedOrders=orders.filter(rec=>rec.split(",")(3)=="COMPLETE")

//pending
val completedOrdersAcc = sc.accumulator(0, "Completed Orders")

val completedOrders=orders.filter(rec => { 
    if(rec.split(",")(3)=="COMPLETE") {
      completedOrdersAcc += 1
    }
    rec.split(",")(3) == "COMPLETE"
  }
)


scala> completedOrdersAcc.value
res53: Int = 22899


val completedOrdersMap=completedOrders.map(rec=>(rec.split(",")(0),rec.split(",")(1)))
//(order_id,date)

val orderItemsMap=order_items.map(rec=>(rec.split(",")(1),(rec.split(",")(2),rec.split(",")(4).toFloat)))
//(order_item_order_id,price)


val ordersJoinOrderItems=completedOrdersMap.join(orderItemsMap)

//(order_id,(date,(productId,price)))

val ordersParsedOrderItems = ordersJoinOrderItems.map(rec=>(rec._2._1,rec._2._2._1,rec._2._2._2)).distinct

//This will contain (productId,date,price)
//count:56740 after distinct 14876

val ordersDeptOrderItems = ordersParsedOrderItems.map(rec=>(rec._1,rec._3,bv.value.get(rec._2).get))

//(date,sub_total,deptName)

val revenuePerDayPerDeptRDD = ordersDeptOrderItems.map(rec=>((rec._1,rec._3),rec._2))

//((date,deptName),sub_total)

val revenuePerDayPerDeptAgg = revenuePerDayPerDeptRDD.reduceByKey(_+_)

revenuePerDayPerDeptAgg.sorByKey().take(10).foreach(println)

((2013-07-25 00:00:00.0,Apparel),3279.57)
((2013-07-25 00:00:00.0,Fan Shop),9798.689)
((2013-07-25 00:00:00.0,Fitness),394.93)
((2013-07-25 00:00:00.0,Footwear),3899.6099)
((2013-07-25 00:00:00.0,Golf),2029.72)
((2013-07-25 00:00:00.0,Outdoors),627.8)
((2013-07-26 00:00:00.0,Apparel),8828.75)
((2013-07-26 00:00:00.0,Fan Shop),20847.688)
((2013-07-26 00:00:00.0,Fitness),183.98001)
((2013-07-26 00:00:00.0,Footwear),5129.42)

0 Likes

#6

#Complete program

package training

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import com.typesafe.config.ConfigFactory

object FiveJoin {
  def main(args: Array[String]): Unit = {

    Logger.getLogger("org").setLevel(Level.OFF)
    Logger.getLogger("akka").setLevel(Level.OFF)

    val master = "local[4]"

    val conf = new SparkConf().setMaster(master).setAppName("Spark on Retail_db")
    val sc = new SparkContext(conf)

    val inputPath = "hdfs://quickstart:8020/user/analytics/retail_db" //args(0)

    val orders = sc.textFile(inputPath + "/orders").flatMap { each =>
      each.split(",") match {
        case Array(id, date, customerId, status) => Some(Orders(id.toInt, date, customerId.toInt, status))
        case _ => None
      }
    }
    val orderItems = sc.textFile(inputPath + "/order_items").flatMap { each =>
      each.split(",") match {
        case Array(id, orderId, productId, quantity, subtotal, productPrice) => Some(OrderItems(id.toInt, orderId.toInt, productId.toInt, quantity.toInt, subtotal.toFloat, productPrice.toFloat))
        case _ => None
      }
    }

    val products = sc.textFile(inputPath + "/products").flatMap { each =>
      each.split(",") match {
        case Array(id, category_id, name, description, price, image) => Some(Products(id.toInt, category_id.toInt, name, description, price.toFloat, image))
        case _ => None
      }
    }

    val categories = sc.textFile(inputPath + "/categories").flatMap { each =>
      each.split(",") match {
        case Array(id, departmentId, name) => Some(Categories(id.toInt, departmentId.toInt, name))
        case _ => None
      }
    }

    val departments = sc.textFile(inputPath + "/departments").flatMap { each =>
      each.split(",") match {
        case Array(id, name) => Some(Departments(id.toInt, name))
        case _ => None
      }
    }

    val mappedDepartments = departments.map { department => (department.id, department.name) }
    val mappedCategories = categories.map { category => (category.id, category.departmentId) }
    val mappedProducts = products.map { product => (product.categoryId, product.id) }

    val productsJoinCategories = mappedProducts.join(mappedCategories)

    val mappedProductJoinCategories = productsJoinCategories.map(x => (x._2._2, x._2._1)) //(departmentid, product_id)

    val productsJoinDepartment = mappedProductJoinCategories.join(mappedDepartments) //(departmentid, (productId, departmentName))

    val mappedProductsJoinDepartment = productsJoinDepartment.map(x => (x._2._1, x._2._2)) // (productid, departmentName)

    val broadcastVariable = sc.broadcast(mappedProductsJoinDepartment.collectAsMap()) //(productid, departmentName)

    val completedOrders = orders.filter(_.status.contains("COMPLETE"))

    val mappedOrders = completedOrders.map { order => (order.id, order.date) }

    val orderItemsMapped = orderItems.map { orderItems => (orderItems.orderId, (broadcastVariable.value.get(orderItems.productId).get, orderItems.subtotal)) } // (orderItems-orderId,(order-date, (departementname,subtotal ))

    val ordersJoinOrderItems = mappedOrders.join(orderItemsMapped) // (order_id, (order_date,(department_name,subtotal)))

    val orderDateByDepartment = ordersJoinOrderItems.map(x => ((x._2._1, x._2._2._1), x._2._2._2))

    val revenuePerDayPerDepartment = orderDateByDepartment.reduceByKey(_ + _).sortBy({ x => x._2 }, ascending = false)

    revenuePerDayPerDepartment.map(x => "Order-Date: " + x._1._1 + ", DepartmentName: " + x._1._2 + ", Total-Revenue: " + x._2).take(5).foreach(println)

  }
}

#Sample data

Order-Date: 2013-11-03 00:00:00.0, DepartmentName: Fan Shop, Total-Revenue: 31096.723
Order-Date: 2014-04-08 00:00:00.0, DepartmentName: Fan Shop, Total-Revenue: 28994.238
Order-Date: 2014-02-19 00:00:00.0, DepartmentName: Fan Shop, Total-Revenue: 28047.438
Order-Date: 2014-05-16 00:00:00.0, DepartmentName: Fan Shop, Total-Revenue: 27396.836
Order-Date: 2013-12-11 00:00:00.0, DepartmentName: Fan Shop, Total-Revenue: 27207.066
0 Likes

#7

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import com.typesafe.config.ConfigFactory
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

object SparkExercise18 {
def main(args: Array[String]) {
val appConf = ConfigFactory.load()
val conf = new SparkConf().setAppName(“Sneh Exercise 18”).setMaster(appConf.getConfig(args(2)).getString(“deployment”))
val sc = new SparkContext(conf)

val inputPath = args(0)
val outputPath = args(1)

val fs = FileSystem.get(sc.hadoopConfiguration)
val inputPathExists = fs.exists(new Path(inputPath))
val outputPathExists = fs.exists(new Path(outputPath))

if (!inputPathExists) {
  println("Input Path does not exists")
  return
}

if (outputPathExists) {
  fs.delete(new Path(outputPath), true)
}

val departments = sc.textFile(inputPath + "/departments")
val categories = sc.textFile(inputPath + "/categories")
val products = sc.textFile(inputPath + "/products")
val orders = sc.textFile(inputPath + "/orders")
val orderItems = sc.textFile(inputPath + "/order_items")

val departmentsMap = departments.map(rec => (rec.split(",")(0).toInt, rec.split(",")(1)))
val categoriesMap = categories.map(rec => (rec.split(",")(0).toInt, rec.split(",")(1)))
val productsMap = products.map(rec => (rec.split(",")(1).toInt, rec.split(",")(0)))

val prodCat = productsMap.join(categoriesMap)
val prodCatMap = prodCat.map(rec => (rec._2._2.toInt, rec._2._1))
val prodDept = prodCatMap.join(departmentsMap).distinct

val prodDeptMap = prodDept.map(rec => (rec._2._1.toInt, rec._2._2))

val bv = sc.broadcast(prodDeptMap.collectAsMap())

val ordersCompletedAccum = sc.accumulator(0, "ordersCompleted count")

val ordMap = orders.filter(rec =>{
  if (rec.split(",")(3) == "COMPLETE") {
    ordersCompletedAccum += 1
  }     
  rec.split(",")(3) == "COMPLETE"
  }).map(rec => (rec.split(",")(0).toInt, rec.split(",")(1)))

val oiMap = orderItems.map(rec => (rec.split(",")(1).toInt, ((bv.value.get(rec.split(",")(2).toInt).get), rec.split(",")(4).toDouble)))

val ordJoin = ordMap.join(oiMap)

val ordJoinMap = ordJoin.map(rec => ((rec._2._1, rec._2._2._1), rec._2._2._2))

val revenuePerDay = ordJoinMap.reduceByKey(_ + _).sortByKey()

revenuePerDay.saveAsTextFile(outputPath)

sc.stop()

}
}


#Sample Data

((2013-07-25 00:00:00.0,Apparel),3279.57)
((2013-07-25 00:00:00.0,Fan Shop),9798.689999999999)
((2013-07-25 00:00:00.0,Fitness),394.92999999999995)
((2013-07-25 00:00:00.0,Footwear),3899.609999999999)
((2013-07-25 00:00:00.0,Golf),2029.72)
((2013-07-25 00:00:00.0,Outdoors),627.8000000000001)
((2013-07-26 00:00:00.0,Apparel),8828.749999999996)
((2013-07-26 00:00:00.0,Fan Shop),20847.679999999986)
((2013-07-26 00:00:00.0,Fitness),183.98000000000002)
((2013-07-26 00:00:00.0,Footwear),5129.419999999999)
((2013-07-26 00:00:00.0,Golf),6189.37)
((2013-07-26 00:00:00.0,Outdoors),986.6800000000001)
((2013-07-27 00:00:00.0,Apparel),5489.259999999999)
((2013-07-27 00:00:00.0,Fan Shop),16697.979999999992)
((2013-07-27 00:00:00.0,Fitness),234.95)
((2013-07-27 00:00:00.0,Footwear),4477.59)
((2013-07-27 00:00:00.0,Golf),5149.6900000000005)
((2013-07-27 00:00:00.0,Outdoors),1106.74)
((2013-07-28 00:00:00.0,Apparel),6589.169999999998)
((2013-07-28 00:00:00.0,Fan Shop),12298.489999999998)
((2013-07-28 00:00:00.0,Fitness),59.99)
((2013-07-28 00:00:00.0,Footwear),3249.59)
((2013-07-28 00:00:00.0,Golf),4114.81)
((2013-07-28 00:00:00.0,Outdoors),700.8600000000001)
((2013-07-29 00:00:00.0,Apparel),8168.859999999997)
((2013-07-29 00:00:00.0,Fan Shop),23942.369999999984)
((2013-07-29 00:00:00.0,Fitness),319.98)
((2013-07-29 00:00:00.0,Footwear),4644.539999999999)
((2013-07-29 00:00:00.0,Golf),7174.419999999998)
((2013-07-29 00:00:00.0,Outdoors),1648.4800000000002)
((2013-07-30 00:00:00.0,Apparel),9718.659999999994)
((2013-07-30 00:00:00.0,Fan Shop),19647.71999999999)
((2013-07-30 00:00:00.0,Fitness),235.98000000000002)
((2013-07-30 00:00:00.0,Footwear),3472.57)
((2013-07-30 00:00:00.0,Golf),6464.639999999999)
((2013-07-30 00:00:00.0,Outdoors),1050.6399999999999)
((2013-07-31 00:00:00.0,Apparel),9218.789999999997)
((2013-07-31 00:00:00.0,Fan Shop),23197.61999999998)
((2013-07-31 00:00:00.0,Fitness),449.88)
((2013-07-31 00:00:00.0,Footwear),6931.289999999997)
((2013-07-31 00:00:00.0,Golf),5119.57)
((2013-07-31 00:00:00.0,Outdoors),1586.6799999999998)
((2013-08-01 00:00:00.0,Apparel),8458.839999999997)
((2013-08-01 00:00:00.0,Fan Shop),18797.769999999986)
((2013-08-01 00:00:00.0,Fitness),269.95)
((2013-08-01 00:00:00.0,Footwear),3503.6099999999997)
((2013-08-01 00:00:00.0,Golf),6719.459999999999)
((2013-08-01 00:00:00.0,Outdoors),481.78)
((2013-08-02 00:00:00.0,Apparel),7149.029999999998)
((2013-08-02 00:00:00.0,Fan Shop),19597.589999999997)
((2013-08-02 00:00:00.0,Fitness),524.91)
((2013-08-02 00:00:00.0,Footwear),3889.5)
((2013-08-02 00:00:00.0,Golf),4694.69)
((2013-08-02 00:00:00.0,Outdoors),777.72)
((2013-08-03 00:00:00.0,Apparel),7828.91)
((2013-08-03 00:00:00.0,Fan Shop),16348.179999999991)
((2013-08-03 00:00:00.0,Fitness),339.93)
((2013-08-03 00:00:00.0,Footwear),5079.35)
((2013-08-03 00:00:00.0,Golf),4399.6)
((2013-08-03 00:00:00.0,Outdoors),832.74)
((2013-08-04 00:00:00.0,Apparel),4729.41)
((2013-08-04 00:00:00.0,Fan Shop),13698.289999999994)
((2013-08-04 00:00:00.0,Fitness),191.0)
((2013-08-04 00:00:00.0,Footwear),3519.58)
((2013-08-04 00:00:00.0,Golf),3429.8300000000004)
((2013-08-04 00:00:00.0,Outdoors),593.86)
((2013-08-05 00:00:00.0,Apparel),4559.45)
((2013-08-05 00:00:00.0,Fan Shop),11798.939999999997)
((2013-08-05 00:00:00.0,Footwear),3299.67)
((2013-08-05 00:00:00.0,Golf),4789.610000000001)
((2013-08-05 00:00:00.0,Outdoors),1356.6599999999999)
((2013-08-06 00:00:00.0,Apparel),9538.759999999997)
((2013-08-06 00:00:00.0,Fan Shop),20337.619999999988)
((2013-08-06 00:00:00.0,Fitness),299.95)
((2013-08-06 00:00:00.0,Footwear),3995.46)
((2013-08-06 00:00:00.0,Golf),5749.379999999999)
((2013-08-06 00:00:00.0,Outdoors),1492.6200000000001)
((2013-08-07 00:00:00.0,Apparel),7119.069999999999)
((2013-08-07 00:00:00.0,Fan Shop),15247.999999999996)
((2013-08-07 00:00:00.0,Fitness),299.99)
((2013-08-07 00:00:00.0,Footwear),3171.6400000000003)
((2013-08-07 00:00:00.0,Golf),4959.51)
((2013-08-07 00:00:00.0,Outdoors),734.8900000000001)
((2013-08-08 00:00:00.0,Apparel),6599.15)
((2013-08-08 00:00:00.0,Fan Shop),12248.839999999997)
((2013-08-08 00:00:00.0,Fitness),225.99)
((2013-08-08 00:00:00.0,Footwear),2769.71)
((2013-08-08 00:00:00.0,Golf),4234.710000000001)
((2013-08-08 00:00:00.0,Outdoors),1280.67)
((2013-08-09 00:00:00.0,Apparel),4449.41)
((2013-08-09 00:00:00.0,Fan Shop),12498.859999999995)
((2013-08-09 00:00:00.0,Fitness),409.99)
((2013-08-09 00:00:00.0,Footwear),1319.88)
((2013-08-09 00:00:00.0,Golf),2639.84)
((2013-08-09 00:00:00.0,Outdoors),773.97)
((2013-08-10 00:00:00.0,Apparel),6789.09)
((2013-08-10 00:00:00.0,Fan Shop),19097.90999999999)
((2013-08-10 00:00:00.0,Fitness),192.97)
((2013-08-10 00:00:00.0,Footwear),4999.5)
((2013-08-10 00:00:00.0,Golf),6849.46)
((2013-08-10 00:00:00.0,Outdoors),1109.79)
((2013-08-11 00:00:00.0,Apparel),4229.51)
((2013-08-11 00:00:00.0,Fan Shop),11448.809999999998)
((2013-08-11 00:00:00.0,Fitness),299.99)
((2013-08-11 00:00:00.0,Footwear),1695.81)
((2013-08-11 00:00:00.0,Golf),2809.71)
((2013-08-11 00:00:00.0,Outdoors),818.86)
((2013-08-12 00:00:00.0,Apparel),8888.809999999998)
((2013-08-12 00:00:00.0,Fan Shop),18977.47999999999)
((2013-08-12 00:00:00.0,Fitness),74.97)
((2013-08-12 00:00:00.0,Footwear),4719.48)
((2013-08-12 00:00:00.0,Golf),7189.61)
((2013-08-12 00:00:00.0,Outdoors),1288.7600000000002)
((2013-08-13 00:00:00.0,Apparel),2809.66)
((2013-08-13 00:00:00.0,Fan Shop),5849.240000000001)
((2013-08-13 00:00:00.0,Fitness),110.0)
((2013-08-13 00:00:00.0,Footwear),1199.88)
((2013-08-13 00:00:00.0,Golf),2399.8)
((2013-08-13 00:00:00.0,Outdoors),99.95)
((2013-08-14 00:00:00.0,Apparel),6569.15)
((2013-08-14 00:00:00.0,Fan Shop),15848.37999999999)
((2013-08-14 00:00:00.0,Footwear),2899.71)
((2013-08-14 00:00:00.0,Golf),4039.74)
((2013-08-14 00:00:00.0,Outdoors),403.9)
((2013-08-15 00:00:00.0,Apparel),8508.849999999997)
((2013-08-15 00:00:00.0,Fan Shop),18518.30999999999)
((2013-08-15 00:00:00.0,Footwear),4599.49)
((2013-08-15 00:00:00.0,Golf),4949.67)
((2013-08-15 00:00:00.0,Outdoors),896.8)
((2013-08-16 00:00:00.0,Apparel),4879.35)
((2013-08-16 00:00:00.0,Fan Shop),11199.239999999996)
((2013-08-16 00:00:00.0,Fitness),399.95)
((2013-08-16 00:00:00.0,Footwear),5561.669999999999)
((2013-08-16 00:00:00.0,Golf),3099.7000000000003)
((2013-08-16 00:00:00.0,Outdoors),407.83)
((2013-08-17 00:00:00.0,Apparel),12158.439999999995)
((2013-08-17 00:00:00.0,Fan Shop),26897.279999999977)
((2013-08-17 00:00:00.0,Fitness),208.96)
((2013-08-17 00:00:00.0,Footwear),5678.389999999999)
((2013-08-17 00:00:00.0,Golf),6799.2)
((2013-08-17 00:00:00.0,Outdoors),1198.67)
((2013-08-18 00:00:00.0,Apparel),6719.159999999997)
((2013-08-18 00:00:00.0,Fan Shop),14405.659999999998)
((2013-08-18 00:00:00.0,Fitness),399.95)
((2013-08-18 00:00:00.0,Footwear),3291.62)
((2013-08-18 00:00:00.0,Golf),5369.67)
((2013-08-18 00:00:00.0,Outdoors),1118.53)
((2013-08-19 00:00:00.0,Apparel),1449.8400000000001)
((2013-08-19 00:00:00.0,Fan Shop),5199.46)
((2013-08-19 00:00:00.0,Fitness),224.95)
((2013-08-19 00:00:00.0,Footwear),1599.8400000000001)
((2013-08-19 00:00:00.0,Golf),2129.78)
((2013-08-20 00:00:00.0,Apparel),6649.089999999998)
((2013-08-20 00:00:00.0,Fan Shop),17367.89999999999)
((2013-08-20 00:00:00.0,Fitness),209.97)
((2013-08-20 00:00:00.0,Footwear),4799.43)
((2013-08-20 00:00:00.0,Golf),3344.75)
((2013-08-20 00:00:00.0,Outdoors),308.90999999999997)
((2013-08-21 00:00:00.0,Apparel),3679.4799999999996)
((2013-08-21 00:00:00.0,Fan Shop),10148.92)
((2013-08-21 00:00:00.0,Fitness),599.96)
((2013-08-21 00:00:00.0,Footwear),2437.71)
((2013-08-21 00:00:00.0,Golf),2559.8100000000004)
((2013-08-21 00:00:00.0,Outdoors),900.7900000000002)
((2013-08-22 00:00:00.0,Apparel),6819.049999999999)
((2013-08-22 00:00:00.0,Fan Shop),13238.149999999994)
((2013-08-22 00:00:00.0,Fitness),88.0)
((2013-08-22 00:00:00.0,Footwear),5585.41)
((2013-08-22 00:00:00.0,Golf),3889.5900000000006)
((2013-08-22 00:00:00.0,Outdoors),922.7900000000001)
((2013-08-23 00:00:00.0,Apparel),5439.279999999999)
((2013-08-23 00:00:00.0,Fan Shop),13498.039999999995)
((2013-08-23 00:00:00.0,Fitness),129.97)
((2013-08-23 00:00:00.0,Footwear),3299.67)
((2013-08-23 00:00:00.0,Golf),2649.8500000000004)
((2013-08-23 00:00:00.0,Outdoors),659.86)
((2013-08-24 00:00:00.0,Apparel),8358.979999999996)
((2013-08-24 00:00:00.0,Fan Shop),18448.029999999988)
((2013-08-24 00:00:00.0,Fitness),239.96)
((2013-08-24 00:00:00.0,Footwear),3446.49)
((2013-08-24 00:00:00.0,Golf),8144.56)
((2013-08-24 00:00:00.0,Outdoors),928.7400000000001)
((2013-08-25 00:00:00.0,Apparel),4709.389999999999)
((2013-08-25 00:00:00.0,Fan Shop),12298.369999999997)
((2013-08-25 00:00:00.0,Fitness),161.96)
((2013-08-25 00:00:00.0,Footwear),2119.8)
((2013-08-25 00:00:00.0,Golf),4299.650000000001)
((2013-08-25 00:00:00.0,Outdoors),587.81)
((2013-08-26 00:00:00.0,Apparel),5339.389999999999)
((2013-08-26 00:00:00.0,Fan Shop),18457.72999999999)
((2013-08-26 00:00:00.0,Fitness),329.95)
((2013-08-26 00:00:00.0,Footwear),3564.57)
((2013-08-26 00:00:00.0,Golf),3814.84)
((2013-08-26 00:00:00.0,Outdoors),1257.71)
((2013-08-27 00:00:00.0,Apparel),5939.219999999999)
((2013-08-27 00:00:00.0,Fan Shop),13857.999999999995)
((2013-08-27 00:00:00.0,Fitness),375.95000000000005)
((2013-08-27 00:00:00.0,Footwear),3219.6600000000003)
((2013-08-27 00:00:00.0,Golf),5334.67)
((2013-08-27 00:00:00.0,Outdoors),1189.6800000000003)
((2013-08-28 00:00:00.0,Apparel),5239.289999999999)
((2013-08-28 00:00:00.0,Fan Shop),6995.0)
((2013-08-28 00:00:00.0,Footwear),988.8599999999999)
((2013-08-28 00:00:00.0,Golf),1509.91)
((2013-08-28 00:00:00.0,Outdoors),775.7700000000001)
((2013-08-29 00:00:00.0,Apparel),6339.199999999998)
((2013-08-29 00:00:00.0,Fan Shop),12668.139999999996)
((2013-08-29 00:00:00.0,Fitness),134.97)
((2013-08-29 00:00:00.0,Footwear),5579.42)
((2013-08-29 00:00:00.0,Golf),3849.6000000000004)
((2013-08-29 00:00:00.0,Outdoors),1085.8000000000002)
((2013-08-30 00:00:00.0,Apparel),3469.55)
((2013-08-30 00:00:00.0,Fan Shop),8649.219999999998)

0 Likes

#8

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.serializer.JavaSerializer
import com.typesafe.config.ConfigFactory
import org.apache.spark.SparkConf
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

object PerDayDeptRevenue {

def main(args: Array[String]) {
val appconf = ConfigFactory.load()
val conf = new SparkConf().setAppName(“Dept Revenue”).setMaster(appconf.getConfig(args(2)).getString(“deployement”))
val sc = new SparkContext(conf)

val fs = FileSystem.get(sc.hadoopConfiguration)

val inputPath = args(0)
val outputPath = args(1)

val inputPathExists = fs.exists(new Path(inputPath))
val outputPathExists = fs.exists(new Path(outputPath))

if (!inputPathExists) {
  println("Input Path does not exists")
  return
}

if (outputPathExists) {
  fs.delete(new Path(outputPath), true)
} 

val orders = sc.textFile(inputPath + “/orders”)
val orderItems = sc.textFile(inputPath + “/order_items”)
val products = sc.textFile(inputPath + “/products”)

val categories = sc.textFile(inputPath + "/categories")
val departments = sc.textFile(inputPath + "/departments")

val departmentsMap = departments.map(rec => (rec.split(",")(0).toInt, rec.split(",")(1)))
val categoriesMap = categories.map(rec => (rec.split(",")(0).toInt, rec.split(",")(1).toInt))
val productsMap = products.map(rec => (rec.split(",")(1).toInt, rec.split(",")(0).toInt))

val joinProCat = productsMap.join(categoriesMap)
val proCatMap = joinProCat.map(rec => (rec._2._2, rec._2._1))

val joinRdd = proCatMap.join(departmentsMap).distinct
val pidDnameMap = joinRdd.map(rec => (rec._2._1, rec._2._2))

val bv = sc.broadcast(pidDnameMap.collectAsMap())

val orderItemssMap = orderItems.map(rec => (rec.split(",")(1).toInt,
  (bv.value.get(rec.split(",")(2).toInt).get,
    rec.split(",")(4).toDouble)))
   
val ordersFilterAccum = sc.accumulator(0, "completed orders count")

val completeOrders = orders.filter(rec => {
 if(rec.split(",")(3) == "COMPLETE"){
   ordersFilterAccum+=1
 }
  rec.split(",")(3) == "COMPLETE"})

val ordersMap = completeOrders.map(rec => (rec.split(",")(0).toInt,
  rec.split(",")(1)))

val JoinOrder = ordersMap.join(orderItemssMap)

val revenueMap = JoinOrder.map(rec => ((rec._2._1, rec._2._2._1), rec._2._2._2))
val averageRevenue = revenueMap.reduceByKey(_ + _).sortByKey()
averageRevenue.saveAsTextFile(outputPath)

}
}

((2013-07-25 00:00:00.0,Apparel),3279.57)
((2013-07-25 00:00:00.0,Fan Shop),9798.689999999999)
((2013-07-25 00:00:00.0,Fitness),394.92999999999995)
((2013-07-25 00:00:00.0,Footwear),3899.609999999999)
((2013-07-25 00:00:00.0,Golf),2029.72)
((2013-07-25 00:00:00.0,Outdoors),627.8000000000001)
((2013-07-26 00:00:00.0,Apparel),8828.749999999996)
((2013-07-26 00:00:00.0,Fan Shop),20847.679999999986)
((2013-07-26 00:00:00.0,Fitness),183.98000000000002)
((2013-07-26 00:00:00.0,Footwear),5129.419999999999)
((2013-07-26 00:00:00.0,Golf),6189.37)
((2013-07-26 00:00:00.0,Outdoors),986.6800000000001)

0 Likes

#9

Complete Program:

package spark.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import com.typesafe.config.ConfigFactory
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

object accumBroadcast {

def main(args: Array[String]) {
val appConf = ConfigFactory.load()
val conf = new SparkConf().setAppName(“Mahesh Ex-18”).setMaster(appConf.getConfig(args(2)).getString(“deployment”))
val sc = new SparkContext(conf)

val inputPath = args(0)
val outputPath = args(1)

val fs = FileSystem.get(sc.hadoopConfiguration)
val inputPathExists = fs.exists(new Path(inputPath))
val outputPathExists = fs.exists(new Path(outputPath))

if (!inputPathExists) {
println(“Input Path does not exists”)
return
}

if (outputPathExists) {
fs.delete(new Path(outputPath), true)
}

val departments = sc.textFile(inputPath + “/departments”)
val categories = sc.textFile(inputPath + “/categories”)

val products = sc.textFile(inputPath + “/products”)
val departmentsMap = departments.map(rec => (rec.split(",")(0).toInt, rec.split(",")(1))).take(5).foreach(println)
val categoriesMap = categories.map(rec => (rec.split(",")(0).toInt, rec.split(",")(1).toInt))

val productsMap = products.map(rec => (rec.split(",")(1).toInt, rec.split(",")(0).toInt))
val productCategories = productsMap.join(categoriesMap)

val productCategoriesMap = productCategories.map(rec => (rec._2._2, rec._2._1))
val productDepartments = productCategoriesMap.join(departmentsMap)
val productDepartmentsMap = productDepartments.map(rec => (rec._2._1, rec._2._2)).distinct

val bv = sc.broadcast(productDepartmentsMap.collectAsMap())

val orders = sc.textFile(inputPath + “/orders”)
val orderItems = sc.textFile(inputPath + “/order_items”)

val ordersCompleted = orders.
filter(rec => (rec.split(",")(3) == “COMPLETE”)).
map(rec => (rec.split(",")(0).toInt, rec.split(",")(1)))
val orderItemsMap = orderItems.
map(rec =>
(rec.split(",")(1).toInt, (bv.value.get(rec.split(",")(2).toInt).get, rec.split(",")(4).toFloat)))

val ordersJoin = ordersCompleted.join(orderItemsMap)
val revenuePerDayPerDepartment = ordersJoin.
map(rec => ((rec._2._1, rec._2._2._1), rec._2._2._2)).reduceByKey((acc, value) => acc + value)

revenuePerDayPerDepartment.sortByKey().saveAsTextFile("/user/mahesh007/spark_training")

Output:

((2013-07-25 00:00:00.0,Apparel),3279.5698)
((2013-07-25 00:00:00.0,Fan Shop),9798.689)
((2013-07-25 00:00:00.0,Fitness),394.93)
((2013-07-25 00:00:00.0,Footwear),3899.61)
((2013-07-25 00:00:00.0,Golf),2029.72)

0 Likes

#10

Complete program:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import com.typesafe.config._
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

object TotalRevPerDay {

  def main(args: Array[String]) {

    val config = ConfigFactory.load()
    val conf = new SparkConf().setAppName("Total Revenue Per Day").setMaster(config.getConfig(args(0)).getString("deployment"))

    val sc = new SparkContext(conf)
    val fs = FileSystem.get(sc.hadoopConfiguration)

    val inputpath1 = new Path(args(1))
    val inputpath2 = new Path(args(2))
    val inputpath3 = new Path(args(3))
    val inputpath4 = new Path(args(4))
    val inputpath5 = new Path(args(5))

    val outputpath = new Path(args(6))

    val inputPath1Exists = fs.exists(inputpath1) // departments
    val inputPath2Exists = fs.exists(inputpath2) //categories
    val inputPath3Exists = fs.exists(inputpath3) //products
    val inputPath4Exists = fs.exists(inputpath4) //order_items
    val inputPath5Exists = fs.exists(inputpath5) //orders

    val outputPathExists = fs.exists(outputpath)

    if (!inputPath1Exists) {
      println("Input Path 1 does not exists")
      return
    }

    if (!inputPath2Exists) {
      println("Input Path 2 does not exists")
      return
    }

    if (!inputPath3Exists) {
      println("Input Path 3 does not exists")
      return
    }

    if (!inputPath4Exists) {
      println("Input Path 4 does not exists")
      return
    }

    if (!inputPath5Exists) {
      println("Input Path 5 does not exists")
      return
    }

    if (outputPathExists) {
      fs.delete(outputpath, true)
    }

    val loadDep = sc.textFile(inputpath1.toString()) //departments
    val loadCat = sc.textFile(inputpath2.toString()) //categories
    val loadPro = sc.textFile(inputpath3.toString()) //products
    val loadOi = sc.textFile(inputpath4.toString()) //order_items
    val loadO = sc.textFile(inputpath5.toString()) //orders

    val loadDepCatMap = loadDep.map(rec => (rec.split(",")(0), rec))
    val loadCatDepMap = loadCat.map(rec => (rec.split(",")(1), rec))
    val joinCatDep = loadCatDepMap.join(loadDepCatMap)
    val joinCatDepMap = joinCatDep.map(rec => (rec._2._1.split(",")(0), rec._2._2.split(",")(1)))
    val loadProMap = loadPro.map(rec => (rec.split(",")(1), rec.split(",")(0)))
    val joinCatDepMapProMap = joinCatDepMap.join(loadProMap)
    val joinCatDepMapProMapMap = joinCatDepMapProMap.map(rec => (rec._2._2.toInt, rec._2._1))
    val bv = sc.broadcast(joinCatDepMapProMapMap.collectAsMap())
    val loadOMap = loadO.filter(rec => (rec.split(",")(3) == "COMPLETE")).map(rec => (rec.split(",")(0).toInt, rec.split(",")(1)))
    val loadOiMap = loadOi.map(rec => (rec.split(",")(1).toInt, (bv.value.get(rec.split(",")(2).toInt).get, rec.split(",")(4).toFloat)))
    val joinOMapOiMap = loadOMap.join(loadOiMap)
    val RevPerDayPerDep = joinOMapOiMap.map(rec => ((rec._2._1, rec._2._2._1), rec._2._2._2.toFloat)).reduceByKey((acc, value) => (acc + value))
    
    RevPerDayPerDep.saveAsTextFile(outputpath.toString())
  }

}

Output:

((2013-12-01 00:00:00.0,Golf),5539.76)
((2014-06-23 00:00:00.0,Golf),2414.89)
((2013-11-18 00:00:00.0,Apparel),6449.17)
((2013-11-18 00:00:00.0,Outdoors),415.84003)
((2013-12-16 00:00:00.0,Fan Shop),4199.59)
((2013-11-08 00:00:00.0,Footwear),2763.62)
((2013-11-25 00:00:00.0,Outdoors),706.85004)
((2013-08-22 00:00:00.0,Outdoors),922.79)
((2013-09-08 00:00:00.0,Apparel),6759.0596)
((2014-07-20 00:00:00.0,Outdoors),984.68)
0 Likes

#11

package spark.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import com.typesafe.config.ConfigFactory
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

class helloworld{
def getMethod(){
println(“hello”);
}
}

object helloworld {

def log(msg: String): Unit = {
println(msg);
}

//var departments : org.apache.spark.rdd.RDD =

def load(inputpath: String, sc: SparkContext): Unit = {

//hellowworld.get =sc.textFile(inputpath + "departments")
//val categories=sc.textFile(inputpath + "categories")
//val products=sc.textFile(inputpath +"products")

}

def main(args: Array[String]): Unit = {

   log("loading ")
val appConf = ConfigFactory.load()
val conf = new SparkConf().setAppName("Arun Exercise 18").setMaster(appConf.getConfig(args(2)).getString("deployment"))
val sc = new SparkContext(conf)

val inputPath = args(0)
val outputPath = args(1)

val fs = FileSystem.get(sc.hadoopConfiguration)
val inputPathExists = fs.exists(new Path(inputPath))
val outputPathExists = fs.exists(new Path(outputPath))

if (!inputPathExists) {
  log("input path doesnot exists")
  sys.exit(1)
}
if (outputPathExists) {
  fs.delete(new Path(outputPath), true)
  log("creating new outputpath as it exists")
}

val departments = sc.textFile(inputPath + "/departments")
val categories = sc.textFile(inputPath + "/categories")
val products = sc.textFile(inputPath + "/products")

//join department and categories

val departmentMap = //departments.map(rec => (rec.split(",")(0).toInt, rec.split(",")(1))) 
  departments.map(rec => (rec.split(",")(0).toInt, rec.split(",")(1)))

val categoriesMap = categories.map(rec => (rec.split(",")(0).toInt, rec.split(",")(1).toInt))

val productsMap = products.map(rec => (rec.split(",")(1).toInt, rec.split(",")(0).toInt))

val productCategories = productsMap.join(categoriesMap)

val productCategoriesMap = productCategories.map(rec => (rec._2._2, rec._2._1))

val productDepartments = productCategoriesMap.join(departmentMap)

val productDepartmentsMap = productDepartments.map(rec => (rec._2._1, rec._2._2)).distinct()

val bv = sc.broadcast(productDepartmentsMap.collectAsMap())

//load orders and orderitems

val orders = sc.textFile(inputPath + "/orders")
val orderItems = sc.textFile(inputPath + "/order_items")

//filter only completed

val ordersCompleted = orders.filter(rec => (rec.split(",") == "COMPLETE"))
  .map(rec => (rec.split(",")(0).toInt, rec.split(",")(1)))

val orderItemsMap = orderItems.
  map(rec =>
    (rec.split(",")(1).toInt, (bv.value.get(rec.split(",")(2).toInt).get,
      rec.split(",")(4).toFloat)))

val ordersJoin = ordersCompleted.join(orderItemsMap)

val revenuePerDayPerDepartment = ordersJoin.
  map(rec => ((rec._2._1, rec._2._2._1), rec._2._2._2)).
  reduceByKey((acc, value) => acc + value)

revenuePerDayPerDepartment.sortByKey().saveAsTextFile(outputPath)

}

}

0 Likes