Exercise 16 - Spark - Build applications using Scala IDE and deploy on cluster

eclipse
apache-spark
scala
#1

Problem:

  • Build the program developed in the below exercise as application using Scala IDE
    Exercise 15 - Spark - Compute average revenue per day for all completed orders

  • Steps involved

  • Update build.sbt, package and eclipse libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.2"

  • Refresh project in scala ide

  • Use typesafe config to externalize execution mode

  • Perform validations on input and output path
    val fs = FileSystem.get(sc.hadoopConfiguration)
    val inputPathExists = fs.exists(new Path(inputPath))
    val outputPathExists = fs.exists(new Path(outputPath))

    if (!inputPathExists) {
    println(“Input Path does not exists”)
    return
    }

    if (outputPathExists) {
    fs.delete(new Path(outputPath), true)
    }

  • Build jar file and ship it to cluster

  • Run the jar file - example

spark-submit --class "SimpleApp" \
  --master yarn \
  --executor-memory 512m \
  --total-executor-cores 1 \
simple-scala_2.11-1.0.jar yarn-client INPUT_PATH OUTPUT_PATH EXECUTION_MODE

Please provide the following

  • Code from Scala IDE
  • Spark submit command
  • Output path
0 Likes

#2

Code:

package main.scala


import com.typesafe.config._
import org.apache.spark.SparkContext, org.apache.spark.SparkConf
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

object SparkFirstProgm {
  def main(args: Array[String]) {
    val appConf=ConfigFactory.load()
    val conf = new SparkConf().setAppName("Avg Revenue").setMaster(appConf.getConfig(args(2)).getString("deploymentMode"))
    val sc = new SparkContext(conf)
    val fs = FileSystem.get(sc.hadoopConfiguration)
    val inputPath=args(0)
    val outputPath=args(1)
    val inputPathExists = fs.exists(new Path(inputPath))
    val outputPathExists = fs.exists(new Path(outputPath))
    if(!inputPathExists)
    {
      println("Input does not exist")
      return
    }
    if(outputPathExists)
      fs.delete(new Path(outputPath),true)
    val orders = sc.textFile(inputPath+"sqoop_import/orders")
    val orderItems=sc.textFile(inputPath+"sqoop_import/order_items")
    val ordersRDD = orders.map(rec=>(rec.split(",")(0),rec))
    val filteredOrdersRDD = ordersRDD.filter(rec=>rec._2.split(",")(3)=="COMPLETE")
    val orderItemsRDD= orderItems.map(rec=>(rec.split(",")(1),rec))
    val joinRDD = filteredOrdersRDD.join(orderItemsRDD)
    val joinParsedRDD = joinRDD.map(rec=>(rec._1,(rec._2._1.split(",")(1),rec._2._2.split(",")(4).toFloat)))
    val UniqueJoinParsedRDD= joinParsedRDD.map(rec=>((rec._1,rec._2._1),rec._2._2)).reduceByKey(_+_)
    val UniqueJoinParsedRDDMap= UniqueJoinParsedRDD.map(rec=>(rec._1._1,(rec._1._2,rec._2.toFloat)))
    val revenueMap = UniqueJoinParsedRDDMap.map(rec=>(rec._2._1,1))
    val ordersPerDay = revenueMap.reduceByKey(_+_)
    val revenuePerDay = UniqueJoinParsedRDDMap.map(rec=>(rec._2._1,rec._2._2))
    val aggRevenuePerDay = revenuePerDay.reduceByKey(_+_)
    val totalRevenuePerDay = ordersPerDay.join(aggRevenuePerDay)
    val totalAggRevenuePerDay = totalRevenuePerDay.map(rec=>(rec._1,rec._2._2/rec._2._1))
    totalAggRevenuePerDay.sortByKey().take(10).foreach(println)
    totalAggRevenuePerDay.sortByKey().saveAsTextFile(outputPath)
  }
}

Spark-submit command
spark-submit --class “main.scala.SparkFirstProgm” averagerevenue_2.11-1.0.jar /user/nagellarajashyam/ /user/nagellarajashyam/output prod

prod is defined in application.properties as prod.deploymentMode=yarn-client

output:

[nagellarajashyam@gw01 version2]$ hdfs dfs -ls /user/nagellarajashyam/out*
Found 3 items
-rw-r–r-- 3 nagellarajashyam hdfs 0 2016-12-20 00:27 /user/nagellarajashyam/output/_SUCCESS
-rw-r–r-- 3 nagellarajashyam hdfs 6417 2016-12-20 00:27 /user/nagellarajashyam/output/part-00000
-rw-r–r-- 3 nagellarajashyam hdfs 5731 2016-12-20 00:27 /user/nagellarajashyam/output/part-00001

0 Likes

#3

#Code from Scala IDE

##FromTraining.scala

package training

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import com.typesafe.config.ConfigFactory

case class Orders(id: Int, date: String, customerID: Int, status: String)
case class OrderItems(id: Int, orderId: Int, productId: Int, quantity: Int, subtotal: Float, productPrice: Float)

object FromTraining {
  def main(args: Array[String]): Unit = {

    Logger.getLogger("org").setLevel(Level.OFF)
    Logger.getLogger("akka").setLevel(Level.OFF)

    val appConfig = ConfigFactory.load()

    val master = appConfig.getConfig(args(2)).getString("deployment")

    val conf = new SparkConf().setMaster(master).setAppName("Spark word count")
    val sc = new SparkContext(conf)

    val inputPath = args(0)
    val outputPath = args(1)

    val fs = FileSystem.get(sc.hadoopConfiguration)
    val inputPathExists = fs.exists(new Path(inputPath))
    val outputPathExists = fs.exists(new Path(outputPath))

    if (!inputPathExists) {
      println("Input Path does not exists")
      return
    }

    if (outputPathExists) {
      fs.delete(new Path(outputPath), true)
    }

    //"hdfs://quickstart:8020/user/analytics/retail_db/orders"

    val orders = sc.textFile(inputPath + "/orders").flatMap { each =>
      each.split(",") match {
        case Array(id, date, customerId, status) => Some(Orders(id.toInt, date, customerId.toInt, status))
        case _ => None
      }
    }
    val orderItems = sc.textFile(inputPath + "/order_items").flatMap { each =>
      each.split(",") match {
        case Array(id, orderId, productId, quantity, subtotal, productPrice) => Some(OrderItems(id.toInt, orderId.toInt, productId.toInt, quantity.toInt, subtotal.toFloat, productPrice.toFloat))
        case _ => None
      }
    }

    val completedOrders = orders.filter(eachOrder => eachOrder.status.contains("COMPLETE"))

    val ordersToFilter = completedOrders.map(eachOrder => (eachOrder.id, eachOrder.date))

    val orderItemsToFilter = orderItems.map(eachOrderItem => (eachOrderItem.orderId, eachOrderItem.subtotal))

    val joinedOrders = orderItemsToFilter.join(ordersToFilter)

    val afterJoin = joinedOrders.flatMap {
      case (orderId, (subTotal, orderDate)) => Some(orderDate, subTotal)
    }

    val aggregatedResult = afterJoin.aggregateByKey((0.0, 0))((a, v) => (a._1 + v, a._2 + 1), (t1, t2) => (t1._1 + t2._1, t1._2 + t2._2))

    val averageResultsForEachDate = aggregatedResult.map {
      case (date, (totalrevenue, totalDays)) =>
        (date, totalrevenue / totalDays)
    }

    val sortedResult = averageResultsForEachDate.sortByKey(ascending = false) //.take(5).foreach(println)

    sortedResult.saveAsTextFile(outputPath)

  }
}

##application.properties:

dev.deployment=local[1]
prod.deployment=yarn-client

#Spark submit command

spark-submit --class training.FromTraining scala-with-tests_2.10-1.0.jar /public/retail_db /user/farhanmisarwala/output/spark-output prod

#Output path

/user/farhanmisarwala/output/spark-output

0 Likes

#4

Code from Scala IDE

def main(args: Array[String]) {
val appconf= ConfigFactory.load()
val conf = new SparkConf().setAppName(“Average Revenue”).setMaster(appconf.getConfig(args(2)).getString(“deployement”))
val sc= new SparkContext(conf)

val fs = FileSystem.get(sc.hadoopConfiguration)

val inputPath= args(0)
val outputPath= args(1)

val inputPathExists = fs.exists(new Path(inputPath))
val outputPathExists = fs.exists(new Path(outputPath))


 if (!inputPathExists) {
    println("Input Path does not exists")
     return
  }

 if (outputPathExists) {
      fs.delete(new Path(outputPath), true)
  }
val orders = sc.textFile(inputPath+"/orders")

val orderItems = sc.textFile(inputPath+"/order_items")

val orderItemsMap = orderItems.map(rec => (rec.split(",")(1).toInt, rec.split(",")(4).toDouble))
val redOrderItemsMap = orderItemsMap.reduceByKey(_ + _)
val completeOrders = orders.filter((rec => rec.split(",")(3) == "COMPLETE"))
val completeOrdersMap = completeOrders.map(rec => (rec.split(",")(0).toInt, rec.split(",")(1)))
val joinRdd = completeOrdersMap.join(redOrderItemsMap)
val revenuerdd = joinRdd.map(rec => (rec._2._1, rec._2._2))


val aggRdd = revenuerdd.aggregateByKey((0.0, 0))(
  (a, v) => (a._1 + v, a._2 + 1),
  (v1, v2) => (v1._1 + v2._1, v1._2 + v2._2))
val avgRevenue = aggRdd.map(rec => (rec._1, (rec._2._1 / rec._2._2)))
val sortavgRevenue = avgRevenue.sortByKey()
sortavgRevenue.saveAsTextFile(args(1))

}


Spark submit command

spark-submit --class “AverageRevenue” averagerevenue_2.11-1.0.jar /public/retail_db /user/parulshine92/sparkOutput prod

Output path
/user/parulshine92/sparkOutput

0 Likes

#5

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import com.typesafe.config.ConfigFactory
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

object SparkExercise16 {
def main(args: Array[String]) {
val appConf = ConfigFactory.load()
val conf = new SparkConf().setAppName(“Sneh Exercise 15”).setMaster(appConf.getConfig(args(2)).getString(“deployment”))
val sc = new SparkContext(conf)

val inputPath = args(0)
val outputPath = args(1)

val fs = FileSystem.get(sc.hadoopConfiguration)
val inputPathExists = fs.exists(new Path(inputPath))
val outputPathExists = fs.exists(new Path(outputPath))

if (!inputPathExists) {
  println("Input Path does not exists")
  return
}

if (outputPathExists) {
  fs.delete(new Path(outputPath), true)
}

val orderItems = sc.textFile(inputPath + "/order_items")
val oiMap = orderItems.map(rec => (rec.split(",")(1).toInt, rec.split(",")(4).toDouble))
val revenueOI = oiMap.reduceByKey((a, b) => a + b)

val orders = sc.textFile(inputPath + "/orders")
val orderComplete = orders.filter(rec => rec.split(",")(3) == "COMPLETE").map(rec => (rec.split(",")(0).toInt, rec.split(",")(1)))

val joinedOiOc = orderComplete.join(revenueOI)

val revenueOiOc = joinedOiOc.map(record => (record._2._1, record._2._2))

val revenuebefAvg = revenueOiOc.aggregateByKey((0.0, 0))(
  (a, v) => (a._1 + v, a._2 + 1),
  (t1, t2) => (t1._1 + t2._1, t1._2 + t2._2))

val revenueAfterAvg = revenuebefAvg.map(record => (record._1, record._2._1 / record._2._2))

revenueAfterAvg.sortByKey().take(10).foreach(println)

revenueAfterAvg.saveAsTextFile(outputPath)
sc.stop()

}
}


#application.properties

dev.deployment=local
prod.deployment=yarn-client


#Spark submit command

spark-submit --class SparkExercise15 sparkexercise15_2.11-1.0.jar /public/retail_db /user/infosnehasish/dec20out01 prod

spark-submit --class SparkExercise15 sparkexercise15_2.11-1.0.jar D:/data/retail_db/public/retail_db D:/data/output dev

0 Likes

#6

Code from Scala IDE

package com.spark.scala

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import com.typesafe.config._
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions

object AvgRevPerDay {

  def main(args: Array[String]) {
    
    val config = ConfigFactory.load()
    val conf = new SparkConf().setAppName("Avg Rev Per Day").setMaster(config.getConfig(args(0)).getString("deployment"))
    val sc = new SparkContext(conf)

    val fs = FileSystem.get(sc.hadoopConfiguration)

    val inputpath1 = new Path(args(1))
    val inputpath2 = new Path(args(2))
    val outputpath = new Path(args(3))
    val inputPath1Exists = fs.exists(inputpath1)
    val inputPath2Exists = fs.exists(inputpath2)
    val outputPathExists = fs.exists(outputpath)

    if (!inputPath1Exists) {
      println("Input Path does not exists")
      return
    }

    if (outputPathExists) {
      fs.delete(outputpath, true)
    }

    val loadFile = sc.textFile(inputpath1.toString())
    val filterLoadFile = loadFile.filter(rec => rec.split(",")(3).equals("COMPLETE"))
    val reqFields = filterLoadFile.map(rec => (rec.split(",")(0).toInt, rec.split(",")(1)))

    val loadOrderItems = sc.textFile(inputpath2.toString())
    val loadOrderItemMap = loadOrderItems.map(rec => (rec.split(",")(1).toInt, rec.split(",")(4).toDouble))
    val reduceOrderItemMap = loadOrderItemMap.reduceByKey((acc, value) => acc + value)

    val joinsFiles = reduceOrderItemMap.join(reqFields)
    val mapAgain = joinsFiles.map(rec => (rec._2._2, rec._2._1))

    val agrFile = mapAgain.aggregateByKey((0.0, 0))((a, v) => (a._1 + v, a._2 + 1), (t1, t2) => (t1._1 + t2._1, t1._2 + t2._2))

    val avgRev = agrFile.map(rec => (rec._1, rec._2._1 / rec._2._2))
    
    //avgRev.take(20).foreach(println)

    avgRev.saveAsTextFile(outputpath.toString())
  }

}

Spark submit command

spark-submit --class "com.spark.scala.AvgRevPerDay" \
--master yarn \
--executor-memory 512m \
--total-executor-cores 1 \
AvgRevPerDay.jar \
prod \
/user/jasonbourne/retail_db/orders/part-00000 \
/user/jasonbourne/retail_db/order_items/part-00000 \
/user/jasonbourne/avgrevperday

Output path

hadoop fs -ls /user/jasonbourne/avgrevperday
Found 3 items
-rw-r--r--   3 jasonbourne hdfs          0 2016-12-20 01:32 /user/jasonbourne/avgrevperday/_SUCCESS
-rw-r--r--   3 jasonbourne hdfs       7575 2016-12-20 01:32 /user/jasonbourne/avgrevperday/part-00000
-rw-r--r--   3 jasonbourne hdfs       7419 2016-12-20 01:32 /user/jasonbourne/avgrevperday/part-00001
0 Likes

#7

Code For IDE ::

package com.scala.avgrvn

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import com.typesafe.config.ConfigFactory

object AvgRevenue {
def main(args: Array[String]) {

var config = ConfigFactory.load();
val master = config.getConfig(args(2)).getString(“deployment”)
val conf = new SparkConf().setMaster(master).setAppName(“Avg-Rev-Calc”)

val sc = new SparkContext(conf);

val fs = FileSystem.get(sc.hadoopConfiguration)
val inputpath = args(0)
val outputPath = args(1)
val inputPathExists = fs.exists(new Path(inputpath))
val outputPathExists = fs.exists(new Path(outputPath))

if (!inputPathExists) {
println(“Input Path does not exists”)
return
}

if (outputPathExists) {
fs.delete(new Path(outputPath), true)
}

val orders=sc.textFile(inputpath+"/orders");
val cmpl_orders=orders.filter(_.split(",")(3) == “COMPLETE”)
// cmpl_orders.take(5).foreach(println)
val ordersMap = cmpl_orders.map(a => (a.split(",")(0).toInt,a.split(",")(1)))
// ordersMap.take(5).foreach(println)

val order_item=sc.textFile(inputpath+"/order_items");
// order_item.take(5).foreach(println)
val ordersItemMap = order_item.map(a => (a.split(",")(1).toInt,a.split(",")(4).toDouble))
// ordersItemMap.take(5).foreach(println)
val redOitem = ordersItemMap.reduceByKey(_ + _)
// redOitem.take(5).foreach(println)

val ordJoin=redOitem.join(ordersMap).map(_._2)
// ordJoin.take(5).foreach(println)

val ordJoinmap = ordJoin.map(rec => (rec._2, rec._1))
// ordJoinmap.take(5).foreach(println)

val agrFile= ordJoinmap.aggregateByKey((0.0, 0))((a,v)=>(a._1+v , a._2+1), (t1, t2) => (t1._1+t2._1, t1._2+t2._2))
val avgrev = agrFile.map(rec => (rec._1, rec._2._1 / rec._2._2))
ordJoinmap.saveAsTextFile(outputPath+"/avrg_rev");
}
}

Spark Submit ::

spark-submit --class “com.scala.avgrvn.AvgRevenue”
scalaMvn-0.0.1-SNAPSHOT.jar
/user/saswat232/retail_db
/user/saswat232/retail_db/avgrvn
prod

[saswat232@gw01 ~]$ hadoop fs -ls /user/saswat232/retail_db/avgrvn/avrg_rev
Found 3 items
-rw-r–r-- 3 saswat232 hdfs 0 2016-12-20 03:31 /user/saswat232/retail_db/avgrvn/avrg_rev/_SUCCESS
-rw-r–r-- 3 saswat232 hdfs 321485 2016-12-20 03:31 /user/saswat232/retail_db/avgrvn/avrg_rev/part-00000
-rw-r–r-- 3 saswat232 hdfs 321941 2016-12-20 03:31 /user/saswat232/retail_db/avgrvn/avrg_rev/part-00001

0 Likes