Exercise 08 - Write Spark SQL for both 5 and 6

Resources:

  • Click here for $35 coupon for CCA 175 Spark and Hadoop Developer using Python.
  • Click here for $35 coupon for CCA 175 Spark and Hadoop Developer using Scala.
  • Click here for $25 coupon for HDPCD:Spark using Python.
  • Click here for $25 coupon for HDPCD:Spark using Scala.
  • Click here for access to state of the art 13 node Hadoop and Spark Cluster

Description

  • Understanding data frames
  • Ability to write Spark SQL

Problem Statement

  • For Exercise 5
    • Create hive tables for retail_db
    • implement exercise 5 using spark hive sql
    • Writing simple sql is good enough
    • If interested, you can go with sbt application using intellij or eclipse with Scala IDE
    • Use 2 to 4 tasks while executing the queries
    • Run the program on the cluster and observe DAG, stages, executors and executor tasks (use spark-shell, create HiveContext object and then run the query using hiveContext.sql)
  • For Exercise 6
  • Data is available under hdfs path /public/nyse
  • Create data frame for the above data set
  • implement exercise 6 using spark native sql
  • Develop sbt project using intellij or eclipse with Scala IDE
  • Validate locally - you can copy data from the gateway node /data/nyse to your PC to validate locally either by using tools like winscp or scp command.
  • Build the jar file using sbt and then run on the cluster
  • Use 2 to 4 tasks while executing the queries
  • Run the program on the cluster and observe DAG, stages, executors and executor tasks
  • Sample query to get top 5 stocks every month by volume (using windowing functions)
    select * from (select trademonth, stockticker, monthly_volume, dense_rank() over (partition by trademonth order by monthly_volume desc) rnk from (select substr(tradedate, 1, 7) trademonth, stockticker, sum(volume) monthly_volume from stocks_eod group by substr(tradedate, 1, 7), stockticker) q1) q2 where rnk <= 5 order by trademonth, rnk;
1 Like

Need help on joining two sql.DataFrame at (val rev = ordersJoin.join(prodDept)) it is taking too long here, any other way ?

/**

  • Created by Ravinder on 3/21/2017.
    */
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    import com.typesafe.config._
    import org.apache.hadoop.fs._
    import org.apache.spark.sql._
    import org.apache.spark.sql.functions
    import org.apache.spark.sql.SQLContext

case class Orders (
order_id: Int,
order_date: String,
order_customer_id: Int,
order_status: String)

case class OrderItems ( order_item_id :Int
, order_item_order_id :Int
, order_item_product_id : Int
, order_item_quantity :Int
, order_item_subtotal :Float
, order_item_product_price :Float)

case class Departments (department_id : Int ,department_name : String)

case class Categories ( category_id : Int,category_department_id :Int, category_name : String )

case class Products ( product_id: Int, product_category_id : Int)

object orders_sql {
def main(args: Array[String]): Unit = {
val props = ConfigFactory.load()
val conf = new SparkConf().
setAppName(“Total Revenue” ).
setMaster(props.getConfig(args(0)).getString(“deploymentMode”))

val sc = new SparkContext(conf)
val sqlContext = new SQLContext((sc))
import sqlContext.implicits._
val fs = FileSystem.get(sc.hadoopConfiguration)
val inpath = new Path(args(1))
val outpath = new Path(args(2))

val isInExist = fs.exists(inpath)
if (!isInExist) {
  println("Invalid path " + inpath)
  return
}

val isOutExist = fs.exists(outpath)

if (isOutExist) {
  fs.delete(outpath,true)
}

val ordersRdd = sc.textFile(inpath + "/orders/")
val orderItemsRdd = sc.textFile(inpath + "/order_items/")
val productsRdd = sc.textFile(inpath + "/products/")
val categoriesRdd = sc.textFile(inpath + "/categories/")
val departmentRdd = sc.textFile(inpath + "/departments/")

/* val ordersRdd = sc.textFile("/public/retail_db/orders")

val orderItemsRdd = sc.textFile("/public/retail_db/order_items/")
val productsRdd = sc.textFile("/public/retail_db/products/")
val categoriesRdd = sc.textFile("/public/retail_db/categories/")
val departmentRdd = sc.textFile("/public/retail_db/departments/")
// COMPLETE, CLOSED*/

val ordersMap = ordersRdd.map(rec => {
  val o = rec.split(",")
  Orders(o(0).toInt, o(1), o(2).toInt, o(3)) }).toDF

val orderItemsMap = orderItemsRdd.map( rec=>{
  val o = rec.split(",")
  OrderItems(o(0).toInt, o(1).toInt, o(2).toInt , o(3).toInt, o(4).toFloat,o(5).toFloat )}).toDF

val productsMap = productsRdd.map(rec => {
  val o = rec.split(",")
  Products(o(0).toInt, o(1).toInt ) }).toDF

val categoriesMap = categoriesRdd.map(rec => {
  val o = rec.split(",")
  Categories(o(0).toInt, o(1).toInt ,o(2)) }).toDF

val departmentsMap = departmentRdd.map(rec => {
  val o = rec.split(",")
  Departments(o(0).toInt, o(1)) }).toDF

ordersMap.registerTempTable("orders")
orderItemsMap.registerTempTable("order_items")
productsMap.registerTempTable("products")
categoriesMap.registerTempTable("categories")
departmentsMap.registerTempTable("departments")

val ordersJoin= sqlContext.sql("select order_item_product_id,order_date , Sum(order_item_subtotal ) from orders o, order_items oi where order_id = order_item_order_id and order_status in (\"CLOSED\",\"COMPLETE\")  group by order_date, order_item_product_id ")
val prodDept= sqlContext.sql("select  product_id, department_name  from products p, categories c, departments d where d.department_id = c.category_department_id and  p.product_category_id = c.category_id")
val rev = prodDept.join(ordersJoin)

}
}

When I run spark program in LAB I am getting following error

spark-submit --class “orders_sql”
–master yarn
–executor-memory 512m
–total-executor-cores 1
orders_sql_2.10-1.0.jar prod /public/retail_db orders_sql
Multiple versions of Spark are installed but SPARK_MAJOR_VERSION is not set
Spark1 will be picked by default
spark.yarn.driver.memoryOverhead is set but does not apply in client mode.

Any help please

Can you paste the complete stack trace, someone might be able to help.

Itversity lab, Can you advise the error i am having

Have you validated whether data is generated or not?

HI Itversity lab,

Can you help me with using rank() function with dataframes.

This is my code:

case class NYSE(
stockticker: String,
date: String,
openprices : Double,
Highprices : Double,
lowestprices : Double,
closingprices : Double,
volume : Int )

val nyse = sc.textFile("/user/cloudera/data/nyse/nyse_2009.csv")
val nysemap = nyse.map(rec => {
val r = rec.split(",")
NYSE(r(0), r(1).substring(0,6), r(2).toDouble, r(3).toDouble, r(4).toDouble, r(5).toDouble, r(6).toInt)
}).toDF. registerTempTable(“Nyse”)

val nysegrp = nysemap.groupBy(“date”,“stockticker”).agg(sum(“volume”).alias(“vol”))

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val wSpec1 = Window.partitionBy(“date”).orderBy($“vol”.desc)

nysegrp.withColumn(“stockticker”,rank().over(wSpec1)).show()

This is the exception regarding the HiveContext that I am getting. >>

scala> nysegrp.withColumn(“stockticker”,rank().over(wSpec1)).show()
org.apache.spark.sql.AnalysisException: Could not resolve window function ‘rank’. Note that, using window functions currently requires a HiveContext;
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)

package Exercises
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import com.typesafe.config._
import org.apache.hadoop.fs._
import org.apache.spark.sql
import org.apache.spark.sql.functions
import org.apache.spark.sql.SQLContext

/**

  • Created on 21/09/2017.
    */

case class NYSEStocks(
tradedate: String,
stockticker: String,
volume: Long
)

object Ex08SparkSQLforEx06 {

def main(args: Array[String]): Unit = {

val appConf = ConfigFactory.load()
val inputPath = args(0)
val outputPath = args(0)
val topN = args(3)
val conf = new SparkConf().setMaster(appConf.getConfig(args(2)).getString("deployment")).
  setAppName("Top" + topN + "stocks by volume ")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._


val  fs = FileSystem.get(sc.hadoopConfiguration)
val inputPathExists = fs.exists(new Path(inputPath))
val outputPathExists = fs.exists(new Path(outputPath))

if (!inputPathExists){
  println("Input Path does not exists")
}

if(outputPathExists){
  fs.delete(new Path(outputPath), true)
}

val nyseStocksRdd = sc.textFile(inputPath+"/nyse")

val nyseStocksMap = nyseStocksRdd.map(rec=>{
  val o = rec.split(",")
  NYSEStocks(o(1).substring(0,7), o(0), o(6).toLong)
  }
).toDF()

nyseStocksMap.registerTempTable("stock_end")

val nyseTopNSales =sqlContext.sql(
  "select b.tradedate, b.stockticker, b.totvolume from ( " +
    "select a.tradedate, a.stockticker, a.totvolume,  dense_rank()  " +
    " OVER (PARTITION BY tradedate ORDER BY totvolume DESC) as rank from " +
    "(	select tradedate, stockticker, sum(volume) as totvolume from stock_end " +
    " group by tradedate, stockticker ) a " +
    ") b where b.rank <= "+topN+" order by  b.tradedate, b.rank, b.stockticker" )//.collect().foreach(println)


nyseTopNSales.rdd.saveAsTextFile(outputPath)
sc.stop()

}

}