Exercise 05 - Develop Spark application using Scala to get daily revenue per department

Resources:

  • Click here for $35 coupon for CCA 175 Spark and Hadoop Developer using Python.
  • Click here for $35 coupon for CCA 175 Spark and Hadoop Developer using Scala.
  • Click here for $25 coupon for HDPCD:Spark using Python.
  • Click here for $25 coupon for HDPCD:Spark using Scala.
  • Click here for access to state of the art 13 node Hadoop and Spark Cluster

Description

  • Let us write a program using join and aggregate functions
  • Understand data model
  • orders – for order_date and order_status
  • order_items – for revenue
  • products
  • categories
  • departments

Problem Statement

  • Use sbt with Eclipse to develop this
  • Get all the completed or closed orders and then compute total revenue for each day for each department
  • Print order_date, department_name and order_revenue
  • Use the scala interpreter and preview the data after each step using Spark APIs
  • Develop the program using sbt and eclipse
  • Compile the jar, ship it and run it on the lab
  • Determine number of executors used to run
  • Determine number of executor tasks used to run
  • Understand DAG for each stage
2 Likes
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import com.typesafe.config.ConfigFactory
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

object SparkExercise05 {
  def main(args: Array[String]) {
    val appConf = ConfigFactory.load()
    val conf = new SparkConf().setAppName("Sneh Exercise New 05").setMaster(appConf.getConfig(args(2)).getString("deployment"))
    val sc = new SparkContext(conf)

    val inputPath = args(0)
    val outputPath = args(1)

    val fs = FileSystem.get(sc.hadoopConfiguration)
    val inputPathExists = fs.exists(new Path(inputPath))
    val outputPathExists = fs.exists(new Path(outputPath))

    if (!inputPathExists) {
      println("Input Path does not exists")
      return
    }

    if (outputPathExists) {
      fs.delete(new Path(outputPath), true)
    }

    val departments = sc.textFile(inputPath + "/departments")
    val categories = sc.textFile(inputPath + "/categories")
    val products = sc.textFile(inputPath + "/products")
    val orders = sc.textFile(inputPath + "/orders")
    val orderItems = sc.textFile(inputPath + "/order_items")

    val departmentsMap = departments.map(rec => (rec.split(",")(0).toInt, rec.split(",")(1)))
    val categoriesMap = categories.map(rec => (rec.split(",")(0).toInt, rec.split(",")(1)))
    val productsMap = products.map(rec => (rec.split(",")(1).toInt, rec.split(",")(0)))

    val prodCat = productsMap.join(categoriesMap)
    val prodCatMap = prodCat.map(rec => (rec._2._2.toInt, rec._2._1))
    val prodDept = prodCatMap.join(departmentsMap).distinct

    val prodDeptMap = prodDept.map(rec => (rec._2._1.toInt, rec._2._2))

    val oiMap = orderItems.map(rec => (rec.split(",")(1).toInt, (rec.split(",")(2), rec.split(",")(4))))

    val ordMap = orders.filter(rec => {
      rec.split(",")(3) == "COMPLETE"
    }).map(rec => (rec.split(",")(0).toInt, rec.split(",")(1)))

    val oiO = oiMap.join(ordMap)

    val oiOMap = oiO.map(rec => (rec._2._1._1.toInt, (rec._2._2, rec._2._1._2)))

    val ordProd = oiOMap.join(prodDeptMap)
    
    val ordProdMap = ordProd.map(rec=>((rec._2._1._1,rec._2._2),rec._2._1._2.toFloat)).reduceByKey(_+_).sortByKey()
    
    ordProdMap.saveAsTextFile(outputPath)
    
    sc.stop()
  }
3 Likes

Hi Durga,

please validate the output for the above exercise.

((Apparel,2014-02-05 00:00:00.0),7479.013)
((Apparel,2014-02-10 00:00:00.0),10198.656)
((Apparel,2014-04-08 00:00:00.0),13298.33)
((Apparel,2014-01-10 00:00:00.0),10448.628)
((Apparel,2014-01-17 00:00:00.0),6119.1924)
((Apparel,2013-10-22 00:00:00.0),4409.44)
((Apparel,2013-09-01 00:00:00.0),9078.654)
((Fan Shop,2014-01-04 00:00:00.0),14098.809)
((Fan Shop,2013-12-08 00:00:00.0),16948.133)
((Apparel,2014-07-11 00:00:00.0),7289.033)
((Fan Shop,2013-12-04 00:00:00.0),20147.219)
((Fan Shop,2013-12-13 00:00:00.0),12748.366)
((Apparel,2013-09-15 00:00:00.0),5339.3213)
((Apparel,2014-05-19 00:00:00.0),4839.3804)
((Apparel,2013-08-18 00:00:00.0),10678.577)
((Apparel,2013-09-02 00:00:00.0),8678.886)
((Fan Shop,2013-08-17 00:00:00.0),30696.97)
((Apparel,2013-09-14 00:00:00.0),12718.312)
((Apparel,2014-01-27 00:00:00.0),9708.725)
((Fan Shop,2014-04-03 00:00:00.0),29926.088)
((Fan Shop,2014-03-27 00:00:00.0),15548.42)
((Apparel,2013-07-28 00:00:00.0),9218.866)

2 Likes

Where did you run this program? Did you run it in Windows or did you used the SBT packaged jar, copied it to the lab and ran it there?

I am getting the below exception.

Using Spark’s default log4j profile: org/apache/spark/log4j-defaults.properties
17/02/20 19:01:32 INFO SparkContext: Running Spark version 1.6.2
17/02/20 19:01:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable
17/02/20 19:01:32 INFO SecurityManager: Changing view acls to: himanshu.k
17/02/20 19:01:32 INFO SecurityManager: Changing modify acls to: himanshu.k
17/02/20 19:01:32 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(himanshu.k); users with modify permissions: Set(himanshu.k)
17/02/20 19:01:33 INFO Utils: Successfully started service ‘sparkDriver’ on port 54178.
17/02/20 19:01:33 INFO Slf4jLogger: Slf4jLogger started
17/02/20 19:01:33 INFO Remoting: Starting remoting
17/02/20 19:01:33 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@10.129.176.156:54191]
17/02/20 19:01:33 INFO Utils: Successfully started service ‘sparkDriverActorSystem’ on port 54191.
17/02/20 19:01:33 INFO SparkEnv: Registering MapOutputTracker
17/02/20 19:01:33 INFO SparkEnv: Registering BlockManagerMaster
17/02/20 19:01:33 INFO DiskBlockManager: Created local directory at C:\Users\Himanshu.k\AppData\Local\Temp\blockmgr-02bf5f32-f8ff-4c69-b354-860452e464f5
17/02/20 19:01:33 INFO MemoryStore: MemoryStore started with capacity 2.4 GB
17/02/20 19:01:33 INFO SparkEnv: Registering OutputCommitCoordinator
17/02/20 19:01:34 INFO Utils: Successfully started service ‘SparkUI’ on port 4040.
17/02/20 19:01:34 INFO SparkUI: Started SparkUI at http://10.129.176.156:4040
17/02/20 19:01:34 INFO Executor: Starting executor ID driver on host localhost
17/02/20 19:01:34 INFO Utils: Successfully started service ‘org.apache.spark.network.netty.NettyBlockTransferService’ on port 54198.
17/02/20 19:01:34 INFO NettyBlockTransferService: Server created on 54198
17/02/20 19:01:34 INFO BlockManagerMaster: Trying to register BlockManager
17/02/20 19:01:34 INFO BlockManagerMasterEndpoint: Registering block manager localhost:54198 with 2.4 GB RAM, BlockManagerId(driver, localhost, 54198)
17/02/20 19:01:34 INFO BlockManagerMaster: Registered BlockManager
17/02/20 19:01:35 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 107.7 KB, free 107.7 KB)
17/02/20 19:01:35 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 9.8 KB, free 117.5 KB)
17/02/20 19:01:35 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:54198 (size: 9.8 KB, free: 2.4 GB)
17/02/20 19:01:35 INFO SparkContext: Created broadcast 0 from textFile at TotalRevenue.scala:14
17/02/20 19:01:35 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 107.7 KB, free 225.2 KB)
17/02/20 19:01:35 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 9.8 KB, free 235.0 KB)
17/02/20 19:01:35 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:54198 (size: 9.8 KB, free: 2.4 GB)
17/02/20 19:01:35 INFO SparkContext: Created broadcast 1 from textFile at TotalRevenue.scala:20
17/02/20 19:01:35 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
17/02/20 19:01:35 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
17/02/20 19:01:35 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
17/02/20 19:01:35 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
17/02/20 19:01:35 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
17/02/20 19:01:35 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:278)
at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:300)
at org.apache.hadoop.util.Shell.(Shell.java:293)
at org.apache.hadoop.util.StringUtils.(StringUtils.java:76)
at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:362)
at org.apache.spark.SparkContext$$anonfun$hadoopFile$1$$anonfun$33.apply(SparkContext.scala:1015)
at org.apache.spark.SparkContext$$anonfun$hadoopFile$1$$anonfun$33.apply(SparkContext.scala:1015)
at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:176)
at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:176)
at scala.Option.map(Option.scala:146)
at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:176)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:195)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1922)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1209)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1154)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1154)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1154)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1060)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1026)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1026)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1026)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:952)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:952)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:952)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:951)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1457)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1436)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1436)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1436)
at TotalRevenue$.main(TotalRevenue.scala:24)
at TotalRevenue.main(TotalRevenue.scala)
Exception in thread “main” java.lang.NullPointerException
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
at org.apache.hadoop.util.Shell.runCommand(Shell.java:404)
at org.apache.hadoop.util.Shell.run(Shell.java:379)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:678)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:661)
at org.apache.hadoop.fs.FileUtil.execCommand(FileUtil.java:1097)
at org.apache.hadoop.fs.RawLocalFileSystem$RawLocalFileStatus.loadPermissionInfo(RawLocalFileSystem.java:567)
at org.apache.hadoop.fs.RawLocalFileSystem$RawLocalFileStatus.getPermission(RawLocalFileSystem.java:542)
at org.apache.hadoop.fs.LocatedFileStatus.(LocatedFileStatus.java:42)
at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1815)
at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1797)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:233)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1922)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1209)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1154)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1154)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1154)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1060)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1026)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1026)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1026)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:952)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:952)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:952)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:951)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1457)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1436)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1436)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1436)
at TotalRevenue$.main(TotalRevenue.scala:24)
at TotalRevenue.main(TotalRevenue.scala)
17/02/20 19:01:35 INFO SparkContext: Invoking stop() from shutdown hook
17/02/20 19:01:35 INFO SparkUI: Stopped Spark web UI at http://10.129.176.156:4040
17/02/20 19:01:35 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
17/02/20 19:01:35 INFO MemoryStore: MemoryStore cleared
17/02/20 19:01:35 INFO BlockManager: BlockManager stopped
17/02/20 19:01:35 INFO BlockManagerMaster: BlockManagerMaster stopped
17/02/20 19:01:35 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
17/02/20 19:01:35 INFO SparkContext: Successfully stopped SparkContext
17/02/20 19:01:35 INFO ShutdownHookManager: Shutdown hook called
17/02/20 19:01:35 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
17/02/20 19:01:35 INFO ShutdownHookManager: Deleting directory C:\Users\Himanshu.k\AppData\Local\Temp\spark-99f87787-7895-4bdb-9ad6-0dcbed0c40d4

[quote=“himanshu1989, post:4, topic:2538”]
ERROR Shell: Failed to locate the winutils binary in the hadoop binary pathjava.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
[/quote]
Your error is mainly because it not able to locate winutis.exe.
Do you have winutils.exe on your system?
If yes, then set HADOOP_HOME path in system variable
or set evn. variable in eclipse.

1 Like

/**

  • Created by Ravinder on 3/20/2017.
    */
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    import com.typesafe.config._
    import org.apache.hadoop.fs._

object orderRevenu {

def main(args: Array[String]): Unit = {
val props = ConfigFactory.load()
val conf = new SparkConf().
setAppName(“Total Revenue” ).
setMaster(props.getConfig(args(0)).getString(“deploymentMode”))

val sc = new SparkContext(conf)
val fs = FileSystem.get(sc.hadoopConfiguration)
val inpath = new Path(args(1))
val outpath = new Path(args(2))

val isInExist = fs.exists(inpath)

if (!isInExist) {
  println("Invalid path " + inpath)
  return
}

val isOutExist = fs.exists(outpath)

if (isOutExist) {
  fs.delete(outpath,true)
}
println(inpath)

val ordersRdd = sc.textFile(inpath + "/orders/")
val orderItemsRdd = sc.textFile(inpath + "/order_items/")
val productsRdd = sc.textFile(inpath + "/products/")
val categoriesRdd = sc.textFile(inpath + "/categories/")
val DepartmentRdd = sc.textFile(inpath + "/departments/")

/* val ordersRdd = sc.textFile("/public/retail_db/orders")
val orderItemsRdd = sc.textFile("/public/retail_db/order_items/")
val productsRdd = sc.textFile("/public/retail_db/products/")
val categoriesRdd = sc.textFile("/public/retail_db/categories/")
val DepartmentRdd = sc.textFile("/public/retail_db/departments/")
// COMPLETE,  CLOSED*/


val ordersData =  ordersRdd.filter(rec => ( rec.split(",")(3) == "COMPLETE" ||
  rec.split(",")(3) == "CLOSED" ))

val ordersMap = ordersData.map(rec => (rec.split(",")(0).toInt, rec.split(",")(1)))
val orderItemsMap = orderItemsRdd.map(rec => (rec.split(",")(1).toInt,
  (rec.split(",")(2),rec.split(",")(4).toFloat)))

val ordersJoin =  orderItemsMap.join(ordersMap)
val ordersJoinmap = ordersJoin.map(rec => ((rec._1,rec._2._1._1, rec._2._2),rec._2._1._2))
val ordersJoinReduce = ordersJoinmap.reduceByKey(_+_)

val ordersJoinRevmap = ordersJoinReduce.map(rec => (rec._1._2.toInt, (rec._1._1,rec._1._3,rec._2)))



val prodMap = productsRdd.map(rec => (rec.split(",")(1).toInt, rec.split(",")(0)))

val categMap = categoriesRdd.map(rec => (rec.split(",")(0).toInt, rec.split(",")(1)))

val prodCat = prodMap.join(categMap)
val prodCatMap = prodCat.map(rec => (rec._2._2.toInt, (rec._1,rec._2._1)))

val DepartmentMap = DepartmentRdd.map(rec => (rec.split(",")(0).toInt, rec.split(",")(1)))

val prodCatDepMap = DepartmentMap.join(prodCatMap)
val prodDepMap = prodCatDepMap.map(rec => (rec._2._2._2.toInt, rec._2._1))

val ordersRev = ordersJoinRevmap.join(prodDepMap)
val ordersRevFinal = ordersRev.map(rec => (rec._2._1._2,rec._2._2,rec._2._1._3))

ordersRevFinal.saveAsTextFile(args(2))

}
}

/**
  • Created by Admin-pc on 30-04-2017.
    */
    import org.apache.spark.{SparkConf, SparkContext}
    import com.typesafe.config.ConfigFactory
    object perDepartmentRevenue {
    def main(args: Array[String]) :Unit = {
    val props = ConfigFactory.load();
    val sparkConf = new SparkConf().setAppName(“perCatRev”).setMaster(props.getConfig(args(2)).getString(“executionMode”))
    val sc = new SparkContext(sparkConf)
    val inputFilePath = args(0)
    val filteredOrderData = sc.textFile(inputFilePath + “/orders/part-00000”).filter(f = rec => {
    if (rec.split(",")(3) == “CLOSED” || rec.split(",")(3) == “COMPLETE”) {
    true
    } else {
    false
    }
    }).map(rec => (rec.split(",")(0),rec.split(",")(1)))
    val orderItemData = sc.textFile(inputFilePath + “/order_items/part-00000”).map(rec => {
    (rec.split(",")(1), (rec.split(",")(2), rec.split(",")(4)))
    })
    val orderItemJoinMap = filteredOrderData.join(orderItemData).map(rec => (rec._2._2._1, (rec._2._1, rec._2._2._2)))
    val productData = sc.textFile(inputFilePath + “/products/part-00000”).map(rec => (rec.split(",")(0),rec.split(",")(1)))
    val orderProductJoinCategoryMap = productData.join(orderItemJoinMap).map(rec => (rec._2._1, (rec._2._2._1, rec._2._2._2)))
    val categoryData = sc.textFile(inputFilePath + “/categories/part-00000”).map(rec => (rec.split(",")(0),rec.split(",")(1)))
    val productCatJoinDepMap = categoryData.join(orderProductJoinCategoryMap).map(rec => (rec._2._1, (rec._2._2._1, rec._2._2._2)))
    val depData = sc.textFile(inputFilePath + “/departments/part-00000”).map(rec => (rec.split(",")(0),rec.split(",")(1)))
    val depCatJoinMapSave = depData.join(productCatJoinDepMap).map(rec => {
    ((rec._2._2._1, rec._2._1), rec._2._2.2.toDouble)}
    ).reduceByKey(
    + _).saveAsTextFile(args(1));
    }
    }

/**

  • Created by naitikkumar.mehta on 5/11/2017.
    */
    import com.typesafe.config.ConfigFactory
    import org.apache.spark.{SparkConf,SparkContext}
    import org.apache.hadoop.fs.{FileSystem,Path}

object DailyRevenuePerDepartment {
def main(args: Array[String]): Unit = {

//Validate if the arguments are provided during execution
if(args.length.equals(0)){
  println("Usage: spark-submit --class className jarLocation inputFilePath outputFilePath dev/prod")
  return
}

// Initialize all the system parameters
val properties = ConfigFactory.load()
val conf = new SparkConf()
  .setAppName("Daily Revenue Per Department")
  .setMaster(properties.getConfig(args(2)).getString("executionMode"))
val sc = new SparkContext(conf)
val fs = FileSystem.get(sc.hadoopConfiguration)

val inputPath = new Path(args(0))
val outputPath = new Path(args(1))

if(!fs.exists(inputPath)){
  println("The input path does not exist. Please provide a valid input path.")
  return
}

if(fs.exists(outputPath)){
  fs.delete(outputPath,true)
}

//RDD[(order_id: Int, order_date: String)]
val orders = sc.textFile(inputPath+"/orders")
  .filter(rec => (rec.split(",")(3) == "COMPLETE" || rec.split(",")(3) == "CLOSED"))
  .map(rec => (rec.split(",")(0).toInt, rec.split(",")(1)))

//RDD[(order_item_order_id: Int, (order_item_product_id: Int, order_item_subtotal: Float))]
val orderItems = sc.textFile(inputPath+"/order_items")
  .map(rec => (rec.split(",")(1).toInt,(rec.split(",")(2).toInt,rec.split(",")(4).toFloat)))

//RDD[(product_id: Int, product_category_id: Int)]
val products = sc.textFile(inputPath+"/products")
  .map(rec => (rec.split(",")(0).toInt,rec.split(",")(1).toInt))

//RDD[(category_id: Int, category_department_id: Int)]
val categories = sc.textFile(inputPath+"/categories")
  .map(rec => (rec.split(",")(0).toInt,rec.split(",")(1).toInt))

//RDD[(department_id: Int, department_name: String)]
val departments = sc.textFile(inputPath+"/departments")
  .map(rec => (rec.split(",")(0).toInt,rec.split(",")(1)))

//RDD[(order_item_order_id: Int, ((order_item_product_id: Int, order_item_subtotal: Float), order_date: String))]
val ordersJoin = orderItems.join(orders)
//RDD[(order_item_product_id: Int, (order_date: String, order_item_subtotal: Float))]
val ordersJoinMap = ordersJoin.map(rec => (rec._2._1._1,(rec._2._2,rec._2._1._2)))

//Rolling up from ORDER_ITEMS to CATEGORIES through PRODUCTS
//RDD[(product_category_id: Int, (order_date: String, order_item_subtotal: Float))]
val ordersCategories = ordersJoinMap.join(products).map(rec => (rec._2._2,(rec._2._1)))

//Rolling up from ORDER_ITEMS to DEPARTMENTS through CATEGORIES
//RDD[(category_department_id: Int, (order_date: String, order_item_subtotal: Float))]
val ordersDepartments = ordersCategories.join(categories).map(rec => (rec._2._2,rec._2._1))

//RDD[(order_date+"\t"+department_name: String, order_item_subtotal: Float)]
val ordersFinal = ordersDepartments.join(departments).map(rec => (rec._2._1._1+"\t"+rec._2._2,rec._2._1._2.toFloat))

//Calculate the dailyRevenue with key as date+department_name
//RDD[(String, Float)]
val dailyRevenuePerDepartment = ordersFinal.reduceByKey((a,b) => a+b)

//Publish the data for reporting purposes with date formatted
//RDD[(String, String, Float)]
val reportingData = dailyRevenuePerDepartment
  .map(rec => (rec._1.split("\t")(0).substring(0,10),rec._1.split("\t")(1),rec._2))

reportingData.saveAsTextFile(outputPath+"/dailyRevenuePerDepartment")
sc.stop()

}
}

1 Like

yeah it is correctly done. thanks.

val orders = sc.textFile("/user/root/sqoopImport/orders")
val order_items = sc.textFile("/user/root/sqoopImport/order_items")
val products = sc.textFile("/user/root/sqoopImport/products")
val categories = sc.textFile("/user/root/sqoopImport/categories")
val departments = sc.textFile("/user/root/sqoopImport/departments")
val catPaired = categories.map(.split(",")).map(x=>(x(0).toInt,x(1).toInt))
val depPaired = departments.map(
.split(",")).map(x=>(x(0).toInt,x(1)))

val catPaired = categories.map(.split(",")).map(x=>(x(1).toInt,x(0).toInt)) // cat_dept_id, cat_id
val depPaired = departments.map(
.split(",")).map(x=>(x(0).toInt,x(1))) // dep_id , dep_name
val catDept = catPaired.join(depPaired).map(x=>x._2) //dept_id, (cat_id, dep_name)

val prodPaired = products.map(_.split(",")).map(x=>(x(1).toInt,x(0).toInt)) //prod_cat_id,prod_id
val prodCatJoin = prodPaired.join(catDept).map(x=>x._2) // cat_id, (prod_id,dep_name) => (prod_id,dep_name)

val order_itemPaired = order_items.map(_.split(",")).map(x=>(x(2).toInt,(x(1).toInt,x(4).toFloat))) // prod_id, (oi_oid,oi_subtotal)
val oiProdJoin = order_itemPaired.join(prodCatJoin).map(x=>x._2).map(x=>(x._1._1,(x._1._2,x._2))) // (oi_id,(subtotal,dept_name))

val orderFilterPaired = orders.map(.split(",")).filter(x=>x(3)==“COMPLETE” || x(3)==“CLOSED”).map(x=>(x(0).toInt,x(1))) //(order_id, order_date)
val joinned = orderFilterPaired.join(oiProdJoin).map(
._2).map(x=>((x._1,x._2._2),x._2._1))

joinned.reduceByKey(+).sortByKey().collect().foreach(println)

Hi Sir,

I have executed the task with the below 3 possible ways and got all output count as 2136 rows. Kindly confirm whether the below code is correct.

Output :

USING API’S :

val dept = sc.textFile("/user/cloudera/allin/retail_db/departments/p*")
val departmentsm = dept.map(rec => (rec.split(",")(0).toInt,rec.split(",")(1) ) )

val catg = sc.textFile("/user/cloudera/allin/retail_db/categories/p*")
val categoriesm = catg.map(rec => (rec.split(",")(1).toInt,rec.split(",")(0).toInt ) )

val products = sc.textFile("/user/cloudera/allin/retail_db/products/p*")
val productsm = products.map(rec => (rec.split(",")(0).toInt,rec.split(",")(1).toInt ) )

val ods = sc.textFile("/user/cloudera/allin/retail_db/orders/p*")
val odsf = ods.filter(rec => rec.split(",")(3)==“COMPLETE” || rec.split(",")(3)==“CLOSED” )
val odsfm = odsf.map(rec => (rec.split(",")(0).toInt,rec.split(",")(1) ))

val oir = sc.textFile("/user/cloudera/allin/retail_db/order_items/p*")
val oirm = oir.map(rec => (rec.split(",")(1).toInt, (rec.split(",")(2).toInt,rec.split(",")(4)) ))

val oorditmjoin = oirm.join(odsfm)
val ood = oorditmjoin.map(rec=> (rec._2._1._1, (rec._2._1._2, rec._2._2)))
val depctjoin = categoriesm.join(departmentsm)
val out1 = depctjoin.map(rec => (rec._2._1,rec._2._2) )

val proodjoin = productsm.join(ood)
val out2 = proodjoin.map(rec => (rec._2._1,(rec._2._2._1, rec._2._2._2) ) )

val out3 = out1.join(out2)
val out4 = out3.map(rec => ((rec._2._2._2,rec._2._1 ),rec._2.2.1.toFloat) )
val output = out4.reduceByKey(
+
)
val finnal = output.map(rec => (rec._1._1, rec._1._2, rec._2) )

USING DATA FRAMES :

case class departments(
department_id: Int,
department_name: String)
val dept = sc.textFile("/user/cloudera/allin/retail_db/departments/p*")
val deptdf = dept.map(rec => {
val b = rec.split(",")
departments(b(0).toInt,b(1).toString )
}).toDF
case class categories(
category_id: Int,
category_department_id: Int,
category_name : String)
val catg = sc.textFile("/user/cloudera/allin/retail_db/categories/p*")
val catgdf = catg.map(rec => {
val b = rec.split(",")
categories(b(0).toInt, b(1).toInt, b(2).toString)
}).toDF

case class Products(
product_id: Int,
product_category_id: String)
val products = sc.textFile("/user/cloudera/allin/retail_db/products/p*")
val productsdf = products.map(rec => {
val b = rec.split(",")
Products(b(0).toInt,b(1).toString)
}).toDF

val ods = sc.textFile("/user/cloudera/allin/retail_db/orders/p*")
val odsf = ods.filter(rec => rec.split(",")(3)==“COMPLETE” || rec.split(",")(3)==“CLOSED” )
val oir = sc.textFile("/user/cloudera/allin/retail_db/order_items/p*")

case class Orderss(
order_id: Int,
order_date: String,
order_customer_id: Int,
order_status : String)
val odsfdf = odsf.map(rec => {
val b = rec.split(",")
Orderss(b(0).toInt, b(1).toString, b(2).toInt, b(3).toString)
}).toDF
odsfdf.registerTempTable(“orders”)

case class OrderItems(
order_item_id : Int,
order_item_order_id: Int,
order_item_product_id: Int,
order_item_quantity: Int,
order_item_subtotal: Double,
order_item_product_price: Double)
val oirdf = oir.map(rec => {
val c = rec.split(",")
OrderItems(c(0).toInt, c(1).toInt, c(2).toInt, c(3).toInt,c(4).toDouble,c(5).toDouble)
}).toDF
oirdf.registerTempTable(“orderitems”)

sqlContext.sql(“select * from orders limit 10”).collect().foreach(println)
val orderjoin = odsfdf.join(oirdf, odsfdf(“order_id”) === oirdf(“order_item_order_id”))
val productjoin = orderjoin.join(productsdf, orderjoin(“order_item_product_id”) === productsdf(“product_id”))
val catgjoin = productjoin.join(catgdf, productjoin(“product_category_id”) === catgdf(“category_id”))
val deptjoin = catgjoin.join(deptdf, catgjoin(“category_department_id”) === deptdf(“department_id”))

sqlContext.setConf(“spark.sql.shuffle.partitions”, “2”)
import org.apache.spark.sql.functions._
deptjoin.groupBy(“order_date”, “department_name”).agg(sum(“order_item_subtotal”)).show()

HIVE QUERY :
select department_name,sum(order_item_subtotal),order_date
from orders
join order_items on orders.order_id = order_items.order_item_order_id
join products on order_items.order_item_product_id = products.product_id
join categories on products.product_category_id = categories.category_id
join departments on categories.category_department_id = departments.department_id
where order_status = “CLOSED” or order_status = “COMPLETE”
group by order_date, department_name;

3 Likes

HERE IS MY OUTPUT AND DATA FRAME CODE

IT WORKED ON CLOUDERA VM
I had data in /home/cloudera/data-master/retail_db
// it requires only spark-shell

//________________________________________________
var sqlContext = new org.apache.spark.sql.SQLContext(sc)

sqlContext.setConf(“spark.sql.shuffle.partitions”, “2”)

// for defining DF_____________________________________case class defines DF structure
case class Orders(order_id: Int, order_date: String, order_customer_id: Int,order_status: String )

case class Products (product_id: Int, product_category_id: Int)

case class Categories(category_id: Int, category_department_id: Int, category_name: String)

case class Departments(department_id: Int, department_name: String)

case class OrderItems (order_item_id: Int, order_item_order_id: Int, order_item_product_id: Int, order_item_quantity: Int, order_subtotal: Float, order_item_product_price: Float)

//________________________________________________________data frames

var ordersDF = sc.textFile(“file:///home/cloudera/data-master/retail_db/orders/”).map(x=> { var a = x.split(","); Orders(a(0).toInt, a(1).toString(), a(2).toInt, a(3).toString() )}).toDF()

var orderitemsDF = sc.textFile(“file:///home/cloudera/data-master/retail_db/order_items”).map(x=>{var a=x.split(","); OrderItems(a(0).toInt,a(1).toInt,a(2).toInt,a(3).toInt,a(4).toFloat,a(5).toFloat)}).toDF()

var productsDF = sc.textFile(“file:///home/cloudera/data-master/retail_db”+"/products").map(x=> {
var a = x.split(",")
Products(a(0).toInt, a(1).toInt)

}).toDF()

var categoriesDF = sc.textFile(“file:///home/cloudera/data-master/retail_db”+"/categories").map(x=> {
var a = x.split(",")
Categories(a(0).toInt, a(1).toInt, a(2).toString())
}).toDF()

var departmentsDF = sc.textFile(“file:///home/cloudera/data-master/retail_db”+"/departments").map(x=> {
var a = x.split(",")
Departments(a(0).toInt, a(1).toString())
}).toDF()

var ordersDFfiltered = ordersDF.filter(ordersDF(“order_status”) === “COMPLETE”)
ordersDF = ordersDFfiltered;

//_______________________________________joining orders, order_items

var ojoin = orderitemsDF.join(ordersDF, ordersDF(“order_id”)=== orderitemsDF(“order_item_order_id”))

// for test
//ojoin.groupBy(“order_date”).agg(sum(“order_subtotal”)).rdd.collect()

//__________________________joining products, categories, departments
var productcatjoin = productsDF.join(categoriesDF, productsDF(“product_category_id”) === categoriesDF(“category_id”))

var productcatjoindept = productcatjoin.join(departmentsDF, productcatjoin(“category_department_id”) === departmentsDF(“department_id”))

//__________________________________________joining both joins products side and orders Side
var productcatjoindeptjoinO = productcatjoindept.join(ojoin, ojoin(“order_item_product_id”) === productcatjoindept(“product_id”))

// showing result
productcatjoindeptjoinO.groupBy(“order_date”,“department_name”).agg(sum(“order_subtotal”)).orderBy(“order_date”).show()

Can you please post the official code and answer snippet for this ? Since I see a lot of mismatches in the replies of people.

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkConf, SparkContext}

object RevenueDepartment {
def main(args:Array[String]){

val conf = new SparkConf().setAppName("RevenueByDepartment").setMaster("local")
val sc =new SparkContext(conf)

val fs = FileSystem.get(sc.hadoopConfiguration)

val inputBashPath = "E:/MyLearning/Hadoop-Itversity/retail_db/"
val outPutPath    = "E:/MyLearning/Hadoop-Itversity/retail_db/RevenueByDepartments"

if(!fs.exists(new Path(inputBashPath))){
  println("Input path is not exists")
  return
}

val outputPathExists = fs.exists(new Path(outPutPath))
if(outputPathExists){
  fs.delete(new Path(outPutPath),true)
}


val orders      = sc.textFile(inputBashPath+"/orders")
val order_items = sc.textFile(inputBashPath+"/order_items")
val products    = sc.textFile(inputBashPath+"/products")
val categories  = sc.textFile(inputBashPath+"/categories")
val departments = sc.textFile(inputBashPath+"departments")

val departmentMap = departments.map(rec => (rec.split(",")(0).toInt,rec.split(",")(1)))
val categoriesMap = categories.map(rec => (rec.split(",")(0).toInt,rec.split(",")(1).toInt))
val productMap    = products.map(rec => (rec.split(",")(1).toInt,rec.split(",")(0).toInt))

val prodCatJoin = productMap.join(categoriesMap) //Join Categories and product table
  //prodCatJoin.take(5).foreach(println)
val prod_deptMap = prodCatJoin.map(rec => (rec._2._2,rec._2._1)) // Get Product department map from above join
  //prod_deptMap.take(5).foreach(println)
val prodDeptJoin = prod_deptMap.join(departmentMap) // Join above map and departments to get department name with id
  //prodDeptJoin.take(5).foreach(println)
val prodDeptJoinMap =prodDeptJoin.map(rec => (rec._2._1.toInt,rec._2._2))

val orderDataFiltered = orders.filter(rec => (rec.split(",")(3)=="COMPLETE")| rec.split(",")(3)=="CLOSE")
val orderMap = orderDataFiltered.map(rec => (rec.split(",")(0).toInt, rec.split(",")(1)))   // ( OrderId, OrderDate)
   //orderMap.take(2).foreach(println)
    //**(3,2013-07-25 00:00:00.0)
    //**(5,2013-07-25 00:00:00.0)

val orderitemsMap = order_items.map(rec => (rec.split(",")(1).toInt, (rec.split(",")(2), rec.split(",")(4)))) // (orderId, (prodId,Amount)
  //orderitemsMap.take(2).foreach(println)
    //**(1,(957,299.98))
    //**(2,(1073,199.99))

val orderItemsJoin = orderitemsMap.join(orderMap)
  //orderItemsJoin.take(5).foreach(println)
    //**(65722,((365,119.98),2014-05-23 00:00:00.0))
    //**(65722,((730,400.0),2014-05-23 00:00:00.0))

val ProductDateAmountMap = orderItemsJoin.map(rec=> (rec._2._1._1.toInt, (rec._2._2, rec._2._1._2))) // (ProductID, date,amount)
  //ProductDateAmountMap.take(5).foreach(println)
    //** (365,(2014-05-23 00:00:00.0,119.98))
    //**(730,(2014-05-23 00:00:00.0,400.0))

val ordProd = ProductDateAmountMap.join(prodDeptJoinMap)
  //ordProd.take(5).foreach(println)
    //**(226,((2013-08-16 00:00:00.0,599.99),Footwear))
    //**(226,((2013-09-01 00:00:00.0,599.99),Footwear))

val prodDeprtRbk = ordProd.map(rec => ((rec._2._1._1,rec._2._2),rec._2._1._2.toFloat)).reduceByKey(_+_).sortByKey()
  //prodDeprtRbk.take(5).foreach(println)
      //**((2013-07-25 00:00:00.0,Apparel),3279.5698)
      //**((2013-07-25 00:00:00.0,Fan Shop),9798.691)
      //**((2013-07-25 00:00:00.0,Fitness),394.93)
      //**((2013-07-25 00:00:00.0,Footwear),3899.6099)
      //**((2013-07-25 00:00:00.0,Golf),2029.7197)
      //**((2013-07-25 00:00:00.0,Outdoors),627.8)
      //**((2013-07-26 00:00:00.0,Apparel),8828.752)

prodDeprtRbk.saveAsTextFile(outPutPath)

}
}

For people who are trying to get the correct answer, Ashwin’s answer should be the correct as both the order status has been included in the conditions correctly. In other answers I see one of the status is missing in the where condition.

Hi Sir,

I have executed the task with the below 3 possible ways and got all output count as 2136 rows. Kindly confirm whether the below code is correct.

Can anyone review the following code. I am getting a number format exception after running the query

//ORDERS
val orders=sc.textFile("file:///home/cloudera/Desktop/data/retail_db/orders")
case class Order(
order_id: Int,
  order_date: String,
  order_customer_id: Int,
  order_status: String)

val ordersDF=orders.map(rec=>{
val b=rec.split(",") 
Order(b(0).toInt,b(1),b(2).toInt,b(3))
}).toDF

ordersDF.registerTempTable("orders")

//ORDER ITEMS
val order_items=sc.textFile("file:///home/cloudera/Desktop/data/retail_db/order_items")
case class OrderItem(
  order_item_id: Int,
  order_item_order_id: Int,
  order_item_product_id: Int,
  order_item_quantity: Int,
  order_item_subtotal: Float,
  order_item_product_price: Float)

val order_itemsDF=order_items.map(rec=>{
val b=rec.split(",")
OrderItem(b(0).toInt,b(1).toInt,b(2).toInt,b(3).toInt,b(4).toFloat,b(5).toFloat)
}).toDF

order_itemsDF.registerTempTable("order_items")

//PRODUCTS
val products=sc.textFile("file:///home/cloudera/Desktop/data/retail_db/products")
case class Product(
  product_id: Int,
  product_category_id: Int,
  product_name: String,
  product_description: String,
  product_price: Float,
  product_image: String)

val productsDF=products.map(rec=>{
val b=rec.split(",")
Product(b(0).toInt,b(1).toInt,b(2),b(3),b(4).toFloat,b(5))
}).toDF

productsDF.registerTempTable("products")

//CATEGORIES
val categories=sc.textFile("file:///home/cloudera/Desktop/data/retail_db/categories")
case class Category(
  category_id: Int,
  category_department_id: Int,
  category_name: String)

val categoriesDF=categories.map(rec=>{
val b=rec.split(",")
Category(b(0).toInt,b(1).toInt,b(2))
}).toDF

categoriesDF.registerTempTable("categories")

//DEPARTMENTS
val departments=sc.textFile("file:///home/cloudera/Desktop/data/retail_db/departments")
case class Department(
  department_id: Int,
  department_name: String)

val departmentsDF=departments.map(rec=>{
val b=rec.split(",")
Department(b(0).toInt,b(1))
}).toDF

departmentsDF.registerTempTable("departments")

// Query
sqlContext.sql("SELECT department_name, order_date, SUM(order_item_subtotal) FROM orders JOIN order_items ON order_id=order_item_order_id JOIN products ON order_item_product_id=product_id JOIN categories ON product_category_id=category_id JOIN departments ON category_department_id=department_id WHERE order_status='COMPLETE' OR order_status='CLOSED' GROUP BY order_date,department_name").collect().foreach(println)

Hi,

I am facing the same issue.
It seems products table has some corrupt data.

please refer this

1 Like

Yes. That seems to work. Thanks a ton

can u execute ? even after filtering that record (685) also i am issues. do you?

Yes I am able to execute

Here is the code

https://github.com/varunu28/CCA175-Practice/blob/master/Workshop%20Exercises/Assignment5.scala