Exercise 19 - DataFrames and Spark Native SQL

apache-spark
scala
dataframes
#1

Problem Statement:

  • Get revenue for each day joining orders and order_items
  • Steps
  • Update build.sbt with dependencies, package and eclipse
  • Develop 2 programs - one using data frame operations and other using Spark Native SQL

Please provide following output:

  • build.sbt
  • Code snippets using data frames as well as spark native sql
0 Likes

#2

package org.test.sparkDataFrames

import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext

import com.typesafe.config.ConfigFactory

object DataFrameEx19 {
case class Orders(
order_id: Int,
order_date: String,
order_customer_id: Int,
order_status: String)

case class OrderItems(
order_item_id: Int,
order_item_order_id: Int,
order_item_product_id: Int,
order_item_quantity: Int,
order_item_subtotal: Float,
order_item_price: Float

)
def main(args: Array[String]): Unit = {
val appConf=ConfigFactory.load()
val conf = new SparkConf().setAppName(“spark exercise 17”)
.setMaster(appConf.getConfig(args(2)).getString(“deployment”))
val sc = new SparkContext(conf)
val inputPath = args(0)
val outputPath = args(1)
val fs = FileSystem.get(sc.hadoopConfiguration)
val inputPathExists = fs.exists(new Path(inputPath))
val outputPathExists = fs.exists(new Path(outputPath))
if (!inputPathExists) {
println(“Input Path does not exists”)
return
}

if (outputPathExists) {
  fs.delete(new Path(outputPath), true)
}

val sqlContext = new SQLContext(sc)

import sqlContext.implicits._
val ordersDF = sc.textFile(inputPath + "/orders").
map { x => {val y=x.split(",")
  Orders(y(0).toInt,y(1),y(2).toInt,y(3))} }.toDF()
  
   val ordersItemsDF = sc.textFile(inputPath + "/order_items").
map { x => {val y=x.split(",")
  OrderItems(y(0).toInt,y(1).toInt,y(2).toInt,y(3).toInt,y(4).toFloat,y(5).toFloat)} }.toDF()
 
  val ordersFiltered=ordersDF.filter(ordersDF("order_status")==="COMPLETE")
  val ordersjoin=ordersFiltered.join(ordersItemsDF,ordersFiltered("order_id")=== ordersItemsDF("order_item_order_id"))
  ordersjoin.groupBy("order_date").sum(("order_item_subtotal")).sort("order_date").rdd.saveAsTextFile(outputPath)

}
}

[2013-07-25 00:00:00.0,20030.320388793945]
[2013-07-26 00:00:00.0,42165.8807926178]

0 Likes

#3

build.sbt:
name := “Spark learnings”

version := “1.0”

// 2.11 doesn’t seem to work
scalaVersion := “2.10.4”

libraryDependencies ++= Seq(
“org.apache.spark” %% “spark-core” % “2.0.2”,
“org.apache.spark” % “spark-sql_2.10” % “2.0.2”,
“org.apache.hadoop” % “hadoop-client” % “2.4.0”,
“com.typesafe” % “config” % “1.3.1”
)

resolvers += “Akka Repository” at “http://repo.akka.io/releases/

code for using dataframes:
val ordersDF = sc.textFile("/public/retail_db/orders").
map(rec => {
val records = rec.split(",")
Orders(records(0).toInt, records(1).toString(), records(2).toInt, records(3).toString())
}).toDF()
val orderItemsDF = sc.textFile("/public/retail_db/order_items").
map(rec => {
val records = rec.split(",")
OrderItems(
records(0).toInt,
records(1).toInt,
records(2).toInt,
records(3).toInt,
records(4).toFloat,
records(5).toFloat)
}).toDF()
val ordersFiltered = ordersDF.
filter(ordersDF(“order_status”) === “COMPLETE”)
val orders_orderitem_Join = ordersFiltered.join(orderItemsDF,
ordersFiltered(“order_id”) === orderItemsDF(“order_item_order_id”))

orders_orderitem_Join.
  groupBy("order_date").
  agg(sum("order_item_subtotal")).
  sort("order_date").
  rdd.
  saveAsTextFile("spark_df/")

code for sql:
val ordersDF = sc.textFile("/public/retail_db/orders").
map(rec => {
val records = rec.split(",")
Orders(records(0).toInt, records(1).toString(), records(2).toInt, records(3).toString())
}).toDF()
ordersDF.registerTempTable(“orders”)
val orderItemsDF = sc.textFile("/public/retail_db/order_items").
map(rec => {
val records = rec.split(",")
OrderItems(
records(0).toInt,
records(1).toInt,
records(2).toInt,
records(3).toInt,
records(4).toFloat,
records(5).toFloat)
}).toDF()
orderItemsDF.registerTempTable(“order_items”)
val totalRevenueDaily = sqlContext.sql("select o.order_date, sum(oi.order_item_subtotal) " +
"from orders o join order_items oi " +
"on o.order_id = oi.order_item_order_id " +
"where o.order_status = ‘COMPLETE’ " +
"group by o.order_date " +
“order by o.order_date”)
totalRevenueDaily.rdd.saveAsTextFile(“spark_sql/”)

0 Likes

#4

#build.sbt

name := " Spark Sql"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.2"

libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.2"

libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.2"

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

#Code snippets using data frames

    val ordersDF = orders.toDF()
    val orderItemsDF = orderItems.toDF()

    val ordersSelect = ordersDF.select(ordersDF("id"), ordersDF("date"))
    val orderItemsSelect = orderItemsDF.select(orderItemsDF("orderId"), orderItemsDF("subtotal"))

    val joinedDF = ordersSelect.join(orderItemsSelect, ordersSelect("id") === orderItemsSelect("orderId"))

    joinedDF.groupBy("date").agg(sum("subtotal")).sort("date").show()

Using spark native sql

  val ordersDF = orders.toDF()
  val orderItemsDF = orderItems.toDF()

  ordersDF.registerTempTable("orders")
  orderItemsDF.registerTempTable("order_items")

   val joined = sqlContext.sql("select orders.date, sum(order_items.subtotal) from orders join order_items on orders.id = order_items.orderId group by orders.date ") .show()
0 Likes

#5

package sparkDataFrames.dataframes

import com.typesafe.config.ConfigFactory

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

import org.apache.hadoop.fs._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

object exampleDataFrames {
case class Orders(order_id:Int, order_date: String, order_customer_id: Int, order_status: String)

def log(msg: String): Unit = {
println(msg);
}

case class OrderItems(order_item_id:Int, order_item_order_id: Int,order_item_product_id: Int,order_item_quantity: Int,
order_item_subtotal: Float,order_item_price: Float)

def main(args:Array[String]){
val appConf = ConfigFactory.load()
val conf = new SparkConf().setAppName("Arun Exercise 18").setMaster(appConf.getConfig(args(2)).getString("deployment"))
val sc = new SparkContext(conf)

//create SQl context
val sqlContext = new SQLContext(sc)
sqlContext.setConf("spark.sql.shuffle.partitions", "2")
import sqlContext.implicits._

val inputPath = args(0)
val outputPath = args(1)

val fs = FileSystem.get(sc.hadoopConfiguration)
val inputPathExists = fs.exists(new Path(inputPath))
val outputPathExists = fs.exists(new Path(outputPath))

if (!inputPathExists) {
  log("input path doesnot exists")
  sys.exit(1)
}
if (outputPathExists) {
  fs.delete(new Path(outputPath), true)
  log("creating new outputpath as it exists")
}

val ordersDF = sc.textFile(inputPath +"/orders").map(rec=>{
val a=rec.split(",")
  Orders(a(0).toInt,a(1).toString(), a(2).toInt, a(3).toString())
}).toDF()

 val orderItemsDF = sc.textFile(inputPath +"/order_items").map(rec=>{
   val a= rec.split(",")
   OrderItems(a(0).toInt,a(1).toInt,a(2).toInt,a(3).toInt,a(4).toFloat,a(5).toFloat)
 }).toDF()


 val ordersFiltered = ordersDF.filter(ordersDF("order_status")==="COMPLETE")
 val ordersJoin = ordersFiltered.join(orderItemsDF,ordersFiltered("order_id")=== orderItemsDF("order_item_order_id"))

 ordersJoin.groupBy("order_date").agg(sum("order_item_subtotal"))
  .sort("order_date")
  .rdd.saveAsTextFile(outputPath)

}
}

0 Likes

#6

build.sbt

name := "RevenuePerDyaDF"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.2"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.6.2"
libraryDependencies += "com.typesafe" % "config" % "1.3.0"

using dataframes:

package main.scala


import com.typesafe.config._
import org.apache.spark.SparkContext, org.apache.spark.SparkConf
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
//import org.apache.spark.sql._
import org.apache.spark.sql.functions._

case class Orders( order_id:Int,order_date:String,order_customer_id: Int,order_status: String)

case class Order_items( 
    order_item_id:Int,
    order_item_order_id:Int,
    order_item_product_id: Int,
    order_item_quantity: Int,
    order_item_subtotal:Float,
    order_item_product_price:Float)
    
object RevenuePerDayDF {
  
   def main(args: Array[String]) {
    val appConf=ConfigFactory.load()
    val conf = new SparkConf().setAppName("Revenue using DF").setMaster(appConf.getConfig(args(2)).getString("deploymentMode"))
    val sc = new SparkContext(conf)
    val fs = FileSystem.get(sc.hadoopConfiguration)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._
    val inputPath=args(0)
    val outputPath=args(1)
    val inputPathExists = fs.exists(new Path(inputPath))
    val outputPathExists = fs.exists(new Path(outputPath))
    if(!inputPathExists)
    {
      println("Input does not exist")
      return
    }
    if(outputPathExists)
      fs.delete(new Path(outputPath),true)
      
    val ordersDF = sc.textFile(inputPath+"sqoop_import/orders").map(rec=>{
      val ordersData=rec.split(",")
      Orders(ordersData(0).toInt,ordersData(1).toString,ordersData(2).toInt,ordersData(3).toString)
    }).toDF()
     
    val orderItemsDF = sc.textFile(inputPath+"sqoop_import/order_items").map(rec=>{
      val ordersData=rec.split(",")
      Order_items(ordersData(0).toInt,ordersData(1).toInt,ordersData(2).toInt,ordersData(3).toInt,ordersData(4).toFloat,ordersData(5).toFloat)
    }).toDF()
    
    val completedOrdersDF=ordersDF.filter(ordersDF("order_status") === "COMPLETE")
    
    val ordersJoin = completedOrdersDF.join(orderItemsDF,ordersDF("order_id")=== orderItemsDF("order_item_order_id"))
    
    ordersJoin.groupBy("order_date").agg(sum("order_item_subtotal")).sort("order_date").rdd.saveAsTextFile(outputPath)
   }
  
}

using dataframes Sql

package main.scala

import com.typesafe.config._
import org.apache.spark.SparkContext, org.apache.spark.SparkConf
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
//import org.apache.spark.sql._
import org.apache.spark.sql.functions._


//orders and orderItems case classes are defined in other class RevenuePerDayDF
object RevenuePerDayDFSQL {
  def main(args: Array[String]) {
    val appConf = ConfigFactory.load()
    val conf = new SparkConf().setAppName("Revenue using DF").setMaster(appConf.getConfig(args(2)).getString("deploymentMode"))
    val sc = new SparkContext(conf)
    val fs = FileSystem.get(sc.hadoopConfiguration)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._
    val inputPath = args(0)
    val outputPath = args(1)
    val inputPathExists = fs.exists(new Path(inputPath))
    val outputPathExists = fs.exists(new Path(outputPath))
    if (!inputPathExists) {
      println("Input does not exist")
      return
    }
    if (outputPathExists)
      fs.delete(new Path(outputPath), true)

    val ordersDF = sc.textFile(inputPath + "sqoop_import/orders").map(rec => {
      val ordersData = rec.split(",")
      Orders(ordersData(0).toInt, ordersData(1).toString, ordersData(2).toInt, ordersData(3).toString)
    }).toDF()

    val orderItemsDF = sc.textFile(inputPath + "sqoop_import/order_items").map(rec => {
      val ordersData = rec.split(",")
      Order_items(ordersData(0).toInt, ordersData(1).toInt, ordersData(2).toInt, ordersData(3).toInt, ordersData(4).toFloat, ordersData(5).toFloat)
    }).toDF()

    //val completedOrdersDF = ordersDF.filter(ordersDF("order_status") === "COMPLETE")

    ordersDF.registerTempTable("orders")
    
    orderItemsDF.registerTempTable("order_items")
    
    val totalRevenuePerDay = sqlContext.sql("select order_date, sum(order_item_subtotal) from orders join order_items on order_id=order_item_order_id where order_status='COMPLETE' group by order_date order by order_date")
    
    
    totalRevenuePerDay.rdd.saveAsTextFile(outputPath)
    
    
  }

}
0 Likes

#7

Pom Dependency ::

org.apache.spark
spark-sql_2.10
1.6.2

package com.scala.avgrvn

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

object SparkDF {
def main(args: Array[String]) {
// Saprk Context
val conf = new SparkConf().setAppName(“RevPerDayDF”)
val sc = new SparkContext(conf)
//Sql Context
val sqlCont = new org.apache.spark.sql.SQLContext(sc)

 sqlCont.setConf("spark.sql.shuffle.partitions", "2")
 import sqlCont.implicits._
 
 val inputpath = "/user/saswat232/retail_db"
 val outputpath = "/user/saswat232/retail_db"
 val ordRDD = sc.textFile(inputpath+"/orders") // val ordRDD = sc.textFile("/user/saswat232/retail_db/orders")
 val OrdDF = ordRDD.map(rec => {val ele = rec.split(",")
    orders(ele(0).toInt, ele(1).toString(), ele(2).toInt, ele(3).toString)}).toDF()
    
 val ordItRDD = sc.textFile(inputpath+"/order_items")
 val ordItDF = ordItRDD.map(rec => {val ele = rec.split(",") 
    order_items(ele(0).toInt, ele(1).toInt, ele(2).toInt, ele(3).toInt, ele(4).toFloat, ele(5).toFloat)}).toDF()
    
 
  val OrdDFFilter = OrdDF.filter(OrdDF("order_status") === "COMPLETE")
  
  val OrdOrItJoin = OrdDFFilter.join(ordItDF,  OrdDFFilter("order_id") === ordItDF("order_item_order_id"))
    
 
  val JoinSelect = OrdOrItJoin.select(OrdDFFilter("order_date"), ordItDF("order_item_subtotal"))
  val GrpDay = JoinSelect.groupBy("order_date").agg(sum("order_item_subtotal")).rdd.saveAsTextFile(outputpath+"/DFOrdBy")  

}
}

OutPut :

[2013-07-25 00:00:00.0,20030.320388793945]
[2013-07-27 00:00:00.0,33156.210554122925]
[2013-07-29 00:00:00.0,45898.65076828003]
[2013-07-30 00:00:00.0,40590.21075248718]
[2013-08-02 00:00:00.0,36633.44071006775]
[2013-08-04 00:00:00.0,26161.970418930054]
[2013-08-06 00:00:00.0,41413.79079246521]
[2013-08-08 00:00:00.0,27359.07057762146]
[2013-08-11 00:00:00.0,21302.69044494629]
[2013-08-13 00:00:00.0,12468.530212402344]
[2013-08-15 00:00:00.0,37473.12078857422]

Saprk SQL ::

package com.scala.avgrvn
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
object DFJoinSQl {

def main(args: Array[String]) {
// Saprk Context
val conf = new SparkConf().setAppName(“RevPerDayDF”)
val sc = new SparkContext(conf)
//Sql Context
val sqlCont = new org.apache.spark.sql.SQLContext(sc)

 sqlCont.setConf("spark.sql.shuffle.partitions", "2")
 import sqlCont.implicits._
 
 val inputpath = "/user/saswat232/retail_db"
 val outputpath = "/user/saswat232/retail_db"
 val ordRDD = sc.textFile(inputpath+"/orders") // val ordRDD = sc.textFile("/user/saswat232/retail_db/orders")
 val OrdDF = ordRDD.map(rec => {val ele = rec.split(",")
    orders(ele(0).toInt, ele(1).toString(), ele(2).toInt, ele(3).toString)}).toDF()
    
 val ordItRDD = sc.textFile(inputpath+"/order_items")
 val ordItDF = ordItRDD.map(rec => {val ele = rec.split(",") 
    order_items(ele(0).toInt, ele(1).toInt, ele(2).toInt, ele(3).toInt, ele(4).toFloat, ele(5).toFloat)}).toDF()
    
    
    OrdDF.registerTempTable("orders")

    ordItDF.registerTempTable("order_items")
 

  val JoinSelect = sqlCont.sql("Select order_date, sum(order_item_subtotal) from orders join order_items on order_id = order_item_order_id where order_status = 'COMPLETE' group by order_date").rdd.saveAsTextFile("/user/saswat232/retail_db/DFSQLJoin")

}
}

OutPut ::

[2013-07-25 00:00:00.0,20030.320388793945]
[2013-07-27 00:00:00.0,33156.210554122925]
[2013-07-29 00:00:00.0,45898.65076828003]
[2013-07-30 00:00:00.0,40590.21075248718]
[2013-08-02 00:00:00.0,36633.44071006775]
[2013-08-04 00:00:00.0,26161.970418930054]
[2013-08-06 00:00:00.0,41413.79079246521]
[2013-08-08 00:00:00.0,27359.07057762146]
[2013-08-11 00:00:00.0,21302.69044494629]
[2013-08-13 00:00:00.0,12468.530212402344]

0 Likes

#8

build.sbt

name := "SQLPerDayRevenue"
version := “1.0”

libraryDependencies ++= Seq(“mysql” % “mysql-connector-java” % “5.1.24”,
“org.apache.spark” % “spark-core_2.10” % “1.6.2”,
“com.typesafe” % “config” % “1.3.1”,
“org.apache.spark” % “spark-sql_2.10” % “1.6.2”
)

USing DataFrames:
val orderDF = sc.textFile(inputPath + “/orders”).map(rec=>{
val a= rec.split(",")
Order(a(0).toInt, a(1).toString(),a(2).toInt, a(3).toString())
}).toDF()

val ordersFiltered= orderDF.filter(orderDF(“order_status”)=== “COMPLETE”)

val orderItemDF = sc.textFile(inputPath + "/order_items").map(rec=>{
  val a= rec.split(",")
  OrderItems(a(0).toInt, a(1).toInt ,a(2).toInt, a(3).toInt , a(4).toFloat, a(5).toFloat)
 }).toDF()
 
 val joinDF = orderItemDF.join(ordersFiltered,ordersFiltered("order_id")===orderItemDF("order_item_order_id"))
 
val revenue= joinDF.groupBy("order_date").agg(sum("order_item_subtotal")).rdd

revenue.saveAsTextFile(outputPath)

Using spark native Sql
val orderDF = sc.textFile(inputPath + “/orders”).map(rec=>{
val a= rec.split(",")
Order(a(0).toInt, a(1).toString(),a(2).toInt, a(3).toString())
}).toDF()

val ordersFiltered= orderDF.filter(orderDF(“order_status”)=== “COMPLETE”)

ordersFiltered.registerTempTable(“CompletedOrders”)

val orderItemDF = sc.textFile(inputPath + "/order_items").map(rec=>{
  val a= rec.split(",")
  OrderItems(a(0).toInt, a(1).toInt ,a(2).toInt, a(3).toInt , a(4).toFloat, a(5).toFloat)
 }).toDF()
 
 orderItemDF.registerTempTable("OrderItems")

val revenue= sqlContext.sql(" select order_date , sum(order_item_subtotal) revenue_per_day " +
" from CompletedOrders join OrderItems " +
" on order_id=order_item_order_id " +
" group by order_date " +
"sort by order_date ")

revenue.rdd.saveAsTextFile(outputPath)

0 Likes

#9

#DataFrame

package retail.dataframes

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import com.typesafe.config.ConfigFactory
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

object SparkExercise19Df {
def main(args: Array[String]) {
val appConf = ConfigFactory.load()
val conf = new SparkConf().setAppName(“Sneh Exercise 19 on DataFrames”).setMaster(appConf.getConfig(args(2)).getString(“deployment”))
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
sqlContext.setConf(“spark.sql.shuffle.partitions”, “2”)
import sqlContext.implicits._

val inputPath = args(0)
val outputPath = args(1)

val fs = FileSystem.get(sc.hadoopConfiguration)
val inputPathExists = fs.exists(new Path(inputPath))
val outputPathExists = fs.exists(new Path(outputPath))

if (!inputPathExists) {
  println("Input Path does not exists")
  return
}

if (outputPathExists) {
  fs.delete(new Path(outputPath), true)
}

val ordersDF = sc.textFile(inputPath + "/orders").map(rec => {
  val a = rec.split(",")
  Orders(a(0).toInt, a(1).toString, a(2).toInt, a(3).toString)
}).toDF()



val orderItemsDF = sc.textFile(inputPath + "/order_items").map(rec => {
  val a = rec.split(",")
  OrderItems(a(0).toInt, a(1).toInt, a(2).toInt, a(3).toInt, a(4).toFloat, a(5).toFloat)
}).toDF()

val ordersFiltered = ordersDF.filter(ordersDF("order_status") === "COMPLETE")

val ordersJoin = ordersFiltered.join(orderItemsDF, ordersFiltered("order_id") === orderItemsDF("order_item_order_id"))

ordersJoin.groupBy("order_date").agg(sum("order_item_subtotal")).sort("order_date").rdd.saveAsTextFile(outputPath)

sc.stop()    

}
}


#Native SQL

package retail.dataframes

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import com.typesafe.config.ConfigFactory
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

object SparkExercise19Native {

def main(args: Array[String]): Unit = {
val appConf = ConfigFactory.load()
val conf = new SparkConf().setAppName(“Sneh Exercise 19 on Native Sql”).setMaster(appConf.getConfig(args(2)).getString(“deployment”))
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
sqlContext.setConf(“spark.sql.shuffle.partitions”, “2”)
import sqlContext.implicits._

val inputPath = args(0)
val outputPath = args(1)

val fs = FileSystem.get(sc.hadoopConfiguration)
val inputPathExists = fs.exists(new Path(inputPath))
val outputPathExists = fs.exists(new Path(outputPath))

if (!inputPathExists) {
  println("Input Path does not exists")
  return
}

if (outputPathExists) {
  fs.delete(new Path(outputPath), true)
}

val ordersDF = sc.textFile(inputPath + "/orders").map(rec => {
  val a = rec.split(",")
  Orders(a(0).toInt, a(1).toString, a(2).toInt, a(3).toString)
}).toDF()

ordersDF.registerTempTable("orders")

val orderItemsDF = sc.textFile(inputPath + "/order_items").map(rec => {
  val a = rec.split(",")
  OrderItems(a(0).toInt, a(1).toInt, a(2).toInt, a(3).toInt, a(4).toFloat, a(5).toFloat)
}).toDF()

orderItemsDF.registerTempTable("order_items")

val totalRevenueDaily = sqlContext.sql("select o.order_date, sum(oi.order_item_subtotal) " +
  "from orders o join order_items oi " +
  "on o.order_id = oi.order_item_order_id " +
  "where o.order_status = 'COMPLETE' " +
  "group by o.order_date " +
  "order by o.order_date")

totalRevenueDaily.rdd.saveAsTextFile(outputPath)

}
}


0 Likes

#10

Code snippets using data frames:

package com.spark.DataFrames

import com.typesafe.config.ConfigFactory

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

import org.apache.hadoop.fs._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

object revenueExample {

case class Orders(
order_id: Int,
order_date: String,
order_customer_id: Int,
order_status: String)

case class Order_items(
order_item_id: Int,
order_item_order_id: Int,
order_item_product_id: Int,
order_item_quantity: Int,
order_item_subtotal: Float,
order_item_price: Float)

def main(args: Array[String]) {
val appConf = ConfigFactory.load()
val conf = new SparkConf().setAppName(“Mahesh-DataFrames exercise”).
setMaster(appConf.getConfig(args(2)).getString(“deployment”))
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
sqlContext.setConf(“spark.sql.shuffle.partitions”, “2”)
import sqlContext.implicits._

val inputPath = "/public/retail_db"
val outputPath = “/user/mahesh007/DFoutputtotalrevenue”

val fs = FileSystem.get(sc.hadoopConfiguration)
val inputPathExists = fs.exists(new Path(inputPath))
val outputPathExists = fs.exists(new Path(outputPath))

if (!inputPathExists) {
println(“Input Path does not exists”)
return }

if (outputPathExists) {
fs.delete(new Path(outputPath), true)}

val ordersDF = sc.textFile(inputPath + “/orders”).
map(rec => {
val a = rec.split(",")
Orders(a(0).toInt, a(1).toString(), a(2).toInt, a(3).toString())
}).toDF()

val orderItemsDF = sc.textFile(inputPath + “/order_items”).
map(rec => {
val a = rec.split(",")
Order_items(
a(0).toInt,
a(1).toInt,
a(2).toInt,
a(3).toInt,
a(4).toFloat,
a(5).toFloat)
}).toDF()

val ordersFiltered = ordersDF.
filter(ordersDF(“order_status”) === “COMPLETE”)
val ordersJoin = ordersFiltered.join(orderItemsDF,
ordersFiltered(“order_id”) === orderItemsDF(“order_item_order_id”))

ordersJoin.
  groupBy("order_date").
  agg(sum("order_item_subtotal")).
  sort("order_date").rdd.saveAsTextFile(outputPath)

}
}

Code snippets using spark native sql:

package com.spark.NativeSQL

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import com.typesafe.config.ConfigFactory
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

object SparkExercise19Native {

def main(args: Array[String]): Unit = {
val appConf = ConfigFactory.load()
val conf = new SparkConf().setAppName(“Mahesh-NativeSQL exercise”).setMaster(appConf.getConfig(args(2)).getString(“deployment”))
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
sqlContext.setConf(“spark.sql.shuffle.partitions”, “2”)
import sqlContext.implicits._

val inputPath = args(0)
val outputPath = args(1)

val fs = FileSystem.get(sc.hadoopConfiguration)
val inputPathExists = fs.exists(new Path(inputPath))
val outputPathExists = fs.exists(new Path(outputPath))

if (!inputPathExists) {
println(“Input Path does not exists”)
return
}

if (outputPathExists) {
fs.delete(new Path(outputPath), true)
}

val ordersDF = sc.textFile(inputPath + “/orders”).map(rec => {
val a = rec.split(",")
Orders(a(0).toInt, a(1).toString, a(2).toInt, a(3).toString)
}).toDF()

ordersDF.registerTempTable(“orders”)

val orderItemsDF = sc.textFile(inputPath + “/order_items”).map(rec => {
val a = rec.split(",")
OrderItems(a(0).toInt, a(1).toInt, a(2).toInt, a(3).toInt, a(4).toFloat, a(5).toFloat)
}).toDF()

orderItemsDF.registerTempTable(“order_items”)

val totalRevenueDaily = sqlContext.sql("select o.order_date, sum(oi.order_item_subtotal) " +
"from orders o join order_items oi " +
"on o.order_id = oi.order_item_order_id " +
"where o.order_status = ‘COMPLETE’ " +
"group by o.order_date " +
“order by o.order_date”)

totalRevenueDaily.rdd.saveAsTextFile(outputPath)

0 Likes

#11

POM.xml:

		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.12</version>
			<!--scope>test</scope-->
		</dependency>

		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-core_2.10</artifactId>
			<version>1.6.2</version>
		</dependency>

		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>5.1.24</version>
		</dependency>

		<dependency>
			<groupId>com.typesafe</groupId>
			<artifactId>config</artifactId>
			<version>1.3.0</version>
		</dependency>

		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-sql_2.10</artifactId>
			<version>1.6.2</version>
		</dependency>

Code snippets : DataFrames

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import com.typesafe.config._
import org.apache.spark.sql
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SQLContext


object RevEachDay {

  def main(args: Array[String]) {

    val config = ConfigFactory.load()
    val conf = new SparkConf().setAppName("Dataframes Rev Each Day").setMaster(config.getConfig(args(0)).getString("deployment"))

    val sc = new SparkContext(conf)
    val ssc = new SQLContext(sc)
    ssc.setConf("spark.sql.shuffle.partitions", "2")
    
    import ssc.implicits._

    val fs = FileSystem.get(sc.hadoopConfiguration)

    val inputpath1 = new Path(args(1))
    val inputpath2 = new Path(args(2))
    val outputpath = new Path(args(3))
    val inputPath1Exists = fs.exists(inputpath1)
    val inputPath2Exists = fs.exists(inputpath2)
    val outputPathExists = fs.exists(outputpath)


    if (!inputPath1Exists) {
      println("Input Path 1 does not exists")
      return
    }

    if (!inputPath2Exists) {
      println("Input Path 2 does not exists")
      return
    }

    if (outputPathExists) {
      fs.delete(outputpath, true)
    }
    
    val loadFileOrders = sc.textFile(inputpath1.toString())    
    val OrdersMap = loadFileOrders.map(rec=>(rec.split(",")))
    val OrdersMapDF= OrdersMap.map(fields=> Orders(fields(0).toInt, fields(1).toString, fields(2).toInt, fields(3).toString)).toDF
    val OrdersMapDFFilter = OrdersMapDF.filter(OrdersMapDF("order_status") === "COMPLETE")
    
    val loadFileOrder_items = sc.textFile(inputpath2.toString())    
    val Order_itemsMap = loadFileOrder_items.map(rec=>(rec.split(",")))
    val Order_itemsMapDF= Order_itemsMap.map(fields=> Order_Items(fields(0).toInt, fields(1).toInt, fields(2).toInt, fields(3).toInt, fields(4).toFloat, fields(5).toFloat)).toDF
    
    val joinOrderOrderItems = OrdersMapDFFilter.join(Order_itemsMapDF, OrdersMapDFFilter("order_id") === Order_itemsMapDF("order_item_order_id"))
    
    val joinSelect = joinOrderOrderItems.select(joinOrderOrderItems("order_date"), joinOrderOrderItems("order_item_subtotal"))
    
    val RevEachDay = joinSelect.groupBy("order_date").agg(sum("order_item_subtotal")).rdd
    
    RevEachDay.saveAsTextFile(outputpath.toString())
  }

}

Output:
[2013-07-25 00:00:00.0,20030.320388793945]
[2013-07-27 00:00:00.0,33156.210554122925]
[2013-07-29 00:00:00.0,45898.65076828003]
[2013-07-30 00:00:00.0,40590.21075248718]
[2013-08-02 00:00:00.0,36633.44071006775]

Code snippets : Native SQL

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import com.typesafe.config._
import org.apache.spark.sql
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SQLContext

object RevEachDaySparkSql {
  def main(args: Array[String]) {

    val config = ConfigFactory.load()
    val conf = new SparkConf().setAppName("Dataframes Rev Each Day").setMaster(config.getConfig(args(0)).getString("deployment"))

    val sc = new SparkContext(conf)
    val ssc = new SQLContext(sc)
    ssc.setConf("spark.sql.shuffle.partitions", "2")

    import ssc.implicits._

    val fs = FileSystem.get(sc.hadoopConfiguration)

    val inputpath1 = new Path(args(1))
    val inputpath2 = new Path(args(2))
    val outputpath = new Path(args(3))
    val inputPath1Exists = fs.exists(inputpath1)
    val inputPath2Exists = fs.exists(inputpath2)
    val outputPathExists = fs.exists(outputpath)

    if (!inputPath1Exists) {
      println("Input Path 1 does not exists")
      return
    }

    if (!inputPath2Exists) {
      println("Input Path 2 does not exists")
      return
    }

    if (outputPathExists) {
      fs.delete(outputpath, true)
    }

    val loadFileOrders = sc.textFile(inputpath1.toString())
    val OrdersMap = loadFileOrders.map(rec => (rec.split(",")))
    val OrdersMapDF = OrdersMap.map(fields => Orders(fields(0).toInt, fields(1).toString, fields(2).toInt, fields(3).toString)).toDF
    val OrdersMapDFFilter = OrdersMapDF.filter(OrdersMapDF("order_status") === "COMPLETE")

    val loadFileOrder_items = sc.textFile(inputpath2.toString())
    val Order_itemsMap = loadFileOrder_items.map(rec => (rec.split(",")))
    val Order_itemsMapDF = Order_itemsMap.map(fields => Order_Items(fields(0).toInt, fields(1).toInt, fields(2).toInt, fields(3).toInt, fields(4).toFloat, fields(5).toFloat)).toDF

    OrdersMapDFFilter.registerTempTable("orders")

    Order_itemsMapDF.registerTempTable("order_items")

    val revEachDaySparkSql = ssc.sql("Select order_date, sum(order_item_subtotal) from orders join order_items on order_id = order_item_order_id where order_status = 'COMPLETE' group by order_date").rdd

    revEachDaySparkSql.saveAsTextFile(outputpath.toString())
  }

} 

Output

[2013-07-25 00:00:00.0,20030.320388793945]
[2013-07-27 00:00:00.0,33156.210554122925]
[2013-07-29 00:00:00.0,45898.65076828003]
[2013-07-30 00:00:00.0,40590.21075248718]
[2013-08-02 00:00:00.0,36633.44071006775]
0 Likes