Hadoop Certification - CCA - Scala - 04 Max By Key (includes SQL)

To Get the Customer ID with max revenue per day:

Code:

package com.balu.spark.challenges

import com.typesafe.config._
import org.apache.hadoop.fs._
import org.apache.spark.{SparkConf, SparkContext}

/**

  • Created by Balus on 7/17/17.
    */
    object CustomerIDWithMaxReveunePerDay {

def main(args: Array[String]): Unit = {

val prop = ConfigFactory.load()

val parms = prop.getConfig(args(0))

val conf = new SparkConf().setAppName("max cust id revenue per day").setMaster(parms.getString("executionMode"))

val sc = new SparkContext(conf)

val path = args(1)
val outputPath = args(2)

val hdfsFS = FileSystem.get(sc.hadoopConfiguration)



  if(hdfsFS.exists(new Path(outputPath)))
  {
    hdfsFS.delete(new Path(outputPath), true)
  }

val ordersRDD = sc.textFile(path +"/orders").map(order => {

   val ele = order.split(",")
  (ele(0).toInt,(ele(1),ele(2).toInt))

} )


val orderItemsRDD = sc.textFile(path +"/order_items").map(order => {
  val ele = order.split(",")

  (ele(0).toInt, ele(4).toFloat)

} )

val joinOrders = ordersRDD.join(orderItemsRDD).map(rec => (rec._2._1._1,(rec._2._1._2, rec._2._2))).groupByKey()

val maxOrders = joinOrders.map(order => {

  (order._1, order._2.toList.sortBy(rec => -rec._2).take(1))
})

maxOrders.map(rec => rec._1 +"\t" +rec._2.mkString("\t")).take(10).foreach(println)

}

}

Output:

2013-10-05 00:00:00.0 (11515,499.95)
2013-08-11 00:00:00.0 (478,499.95)
2014-05-17 00:00:00.0 (9018,499.95)
2014-06-15 00:00:00.0 (7326,499.95)
2013-09-28 00:00:00.0 (2064,499.95)
2013-08-08 00:00:00.0 (8975,499.95)
2014-07-24 00:00:00.0 (6750,499.95)
2014-04-17 00:00:00.0 (8258,499.95)
2014-05-23 00:00:00.0 (10908,499.95)
2014-01-23 00:00:00.0 (11690,499.95)