Exercise 20 - SQL in Hive and Spark context

spark-sql
apache-spark
hive
#1

Problem Statement:

  • Get top 3 departments for each by revenue
  • Hint: Join 5 tables and use analytical/windowing functions
  • For spark, if the query fails break into multiple pieces
  • Do as much as possible by query
  • Use RDD APIs if required

Please provide the following output:

  • Hive Query
  • Spark code, if the hive query does not work as is
0 Likes

#2

hive query:
select * from (select *,dense_rank() over( partition by order_date order by revenue desc) as dept_rank from (SELECT o.order_date, d.department_name, DOUBLE(sum(oi.order_item_subtotal)) AS revenue FROM sumanthsharma21.orders o JOIN sumanthsharma21.order_items oi ON o.order_id = oi.order_item_order_id JOIN sumanthsharma21.products p ON oi.order_item_product_id = p.product_id JOIN sumanthsharma21.categories c ON c.category_id = p.product_category_id JOIN sumanthsharma21.departments d ON d.department_id = c.category_department_id WHERE o.order_status = ‘COMPLETE’ GROUP BY o.order_date, d.department_name order by o.order_date)a)a1 where dept_rank<=3;

output:
a1.order_date a1.department_name a1.revenue a1.dept_rank
2013-07-25 00:00:00.0 Fan Shop 6798.890110015869 1
2013-07-25 00:00:00.0 Apparel 3279.5701179504395 2
2013-07-25 00:00:00.0 Fitness 394.92999267578125 3
2013-07-26 00:00:00.0 Fan Shop 15598.030303955078 1
2013-07-26 00:00:00.0 Apparel 8828.750286102295 2
2013-07-26 00:00:00.0 Fitness 94.0 3
2013-07-27 00:00:00.0 Fan Shop 12798.240238189697 1
2013-07-27 00:00:00.0 Apparel 5489.260177612305 2
2013-07-27 00:00:00.0 Footwear 549.989990234375 3
2013-07-28 00:00:00.0 Fan Shop 8998.710159301758 1
2013-07-28 00:00:00.0 Apparel 6589.170246124268 2

val results = sqlContext.sql(“select * from (select *,dense_rank() over( partition by order_date order by revenue desc) as dept_rank from (SELECT o.order_date, d.department_name, DOUBLE(sum(oi.order_item_subtotal)) AS revenue FROM sumanthsharma21.orders o JOIN sumanthsharma21.order_items oi ON o.order_id = oi.order_item_order_id JOIN sumanthsharma21.products p ON oi.order_item_product_id = p.product_id JOIN sumanthsharma21.categories c ON c.category_id = p.product_category_id JOIN sumanthsharma21.departments d ON d.department_id = c.category_department_id WHERE o.order_status = ‘COMPLETE’ GROUP BY o.order_date, d.department_name order by o.order_date)a)a1 where dept_rank<=3”)

scala> results.show()

±-------------------±--------------±-----------------±--------+
| order_date|department_name| revenue|dept_rank|
±-------------------±--------------±-----------------±--------+
|2013-07-25 00:00:…| Fan Shop| 6798.890110015869| 1|
|2013-07-25 00:00:…| Apparel|3279.5701179504395| 2|
|2013-07-25 00:00:…| Fitness|394.92999267578125| 3|
|2013-07-26 00:00:…| Fan Shop|15598.030303955078| 1|
|2013-07-26 00:00:…| Apparel| 8828.750286102295| 2|
|2013-07-26 00:00:…| Fitness| 94.0| 3|
|2013-07-27 00:00:…| Fan Shop|12798.240238189697| 1|
|2013-07-27 00:00:…| Apparel| 5489.260177612305| 2|
|2013-07-27 00:00:…| Footwear| 549.989990234375| 3|
|2013-07-28 00:00:…| Fan Shop| 8998.710159301758| 1|
|2013-07-28 00:00:…| Apparel| 6589.170246124268| 2|
|2013-07-29 00:00:…| Fan Shop| 18947.67035293579| 1|
|2013-07-29 00:00:…| Apparel| 8168.860248565674| 2|
|2013-07-29 00:00:…| Footwear| 189.0| 3|
|2013-07-30 00:00:…| Fan Shop|13498.170227050781| 1|
|2013-07-30 00:00:…| Apparel| 9718.66032409668| 2|
|2013-07-30 00:00:…| Fitness| 116.0| 3|
|2013-07-31 00:00:…| Fan Shop|16398.060344696045| 1|
|2013-07-31 00:00:…| Apparel| 9218.790306091309| 2|
|2013-07-31 00:00:…| Footwear| 223.9900016784668| 3|
±-------------------±--------------±-----------------±--------+

0 Likes

#3

Hive:

select order_date,department_name,revenue_per_day,denseRank from
(select order_date,department_name,revenue_per_day,dense_rank()
over (partition by order_date order by revenue_per_day) as denseRank from (
select o.order_date , d.department_name , sum(oi.order_item_subtotal) revenue_per_day
from orders o join order_items oi
on o.order_id=oi.order_item_order_id
join products p
on oi.order_item_product_id = p.product_id
join categories c
on p.product_category_id = c.category_id
join departments d
on c.category_department_id= d.department_id
where o.order_status='COMPLETE'
group by o.order_date, d.department_name) query1)query2 where denseRank<=3;

SparkCode:

sqlContext.sql("select order_date,department_name,revenue_per_day,denseRank from (select order_date,department_name,revenue_per_day,dense_rank() "
                +"over (partition by order_date order by revenue_per_day) as denseRank from ("+
                "select o.order_date , d.department_name , sum(oi.order_item_subtotal) revenue_per_day "+
                "from orders o join order_items oi "+
                "on o.order_id=oi.order_item_order_id "+
                "join products p "+
                "on oi.order_item_product_id = p.product_id "+
                "join categories c "+
                "on p.product_category_id = c.category_id "+
                "join departments d "+
                "on c.category_department_id= d.department_id "+
                "where o.order_status='COMPLETE' "+
                "group by o.order_date, d.department_name) query1)query2 where denseRank<=3").show()

output:
±-------------------±--------------±-----------------±--------+
| order_date|department_name| revenue_per_day|denseRank|
±-------------------±--------------±-----------------±--------+
|2014-07-11 00:00:…| Outdoors| 349.87| 1|
|2014-07-11 00:00:…| Footwear| 2375.72| 2|
|2014-07-11 00:00:…| Golf| 2724.75| 3|
|2013-09-02 00:00:…| Fitness| 259.94| 1|
|2013-09-02 00:00:…| Outdoors| 956.6300000000001| 2|
|2013-09-02 00:00:…| Footwear| 3489.65| 3|
|2014-01-09 00:00:…| Outdoors| 925.7900000000001| 1|
|2014-01-09 00:00:…| Golf| 3294.72| 2|
|2014-01-09 00:00:…| Footwear| 3724.599999999999| 3|
|2014-02-24 00:00:…| Outdoors| 1393.64| 1|
|2014-02-24 00:00:…| Footwear| 1729.83| 2|
|2014-02-24 00:00:…| Golf|3849.7499999999995| 3|
|2013-07-29 00:00:…| Fitness| 319.98| 1|
|2013-07-29 00:00:…| Outdoors|1648.4800000000002| 2|
|2013-07-29 00:00:…| Footwear| 4644.54| 3|
|2014-02-19 00:00:…| Fitness| 559.87| 1|
|2014-02-19 00:00:…| Outdoors| 1065.69| 2|
|2014-02-19 00:00:…| Footwear| 3765.57| 3|
|2014-01-30 00:00:…| Outdoors| 818.6900000000002| 1|
|2014-01-30 00:00:…| Golf| 4679.7| 2|

0 Likes

#4

#Hive Query

select * from 
(select q.order_date, q.department_name, q.revenue_per_day, 
dense_rank() over (partition by order_date order by revenue_per_day) as rnk 
from (select o.order_date as order_date, d.departments_name as department_name, sum(oi.order_item_subtotal) as revenue_per_day
from orders o join order_items oi
on o.order_id = oi.order_item_order_id
join products p
on oi.order_item_product_id = p.product_id
join categories c
on p.product_category_id = c.category_id
join departments d
on c.category_departhment_id = d. departments_id
where o.order_status = 'COMPLETE'
group by o.order_date, d.departments_name ) q) q1
where q1.rnk <= 3

#Spark code :construction:
(will post in a while)

0 Likes

#5

select * from
(select order_date, department_name, revenue,
dense_rank() over (partition by q.order_date order by q.revenue ) rnk
from
(select o.order_date,
dept.department_name,
sum(oi.order_item_subtotal) revenue
from orders o join order_items oi
on o.order_id = oi.order_item_order_id
join products pd
on oi.order_item_product_id = pd.product_id
join categories ct
on pd.product_catagory_id = ct.catagory_id
join departments dept
on ct.catagory_department_id = department_id
group by o.order_date, dept.department_name ) q)q1
where q1.rnk <= 3;

2014-02-18 00:00:00 Fitness 975.8800163269043 1
2014-02-18 00:00:00 Outdoors 3629.079999923706 2
2014-02-18 00:00:00 Footwear 11538.420030593872 3
2014-06-13 00:00:00 Fitness 539.8400039672852 1
2014-06-13 00:00:00 Outdoors 2911.270040512085 2
2014-06-13 00:00:00 Footwear 14622.450016021729 3
2013-08-18 00:00:00 Fitness 779.8600158691406 1
2013-08-18 00:00:00 Outdoors 4036.730005264282 2
2013-08-18 00:00:00 Footwear 9574.87990951538 3
2013-09-16 00:00:00 Fitness 434.9300022125244 1
2013-09-16 00:00:00 Outdoors 1896.53000831604 2
2013-09-16 00:00:00 Golf 7544.310062408447 3
2013-12-24 00:00:00 Fitness 343.8999938964844 1
2013-12-24 00:00:00 Outdoors 1129.5699996948242 2
2013-12-24 00:00:00 Footwear 8060.040016174316 3
2013-09-26 00:00:00 Fitness 873.8900108337402 1
2013-09-26 00:00:00 Outdoors 3742.8999919891357 2
2013-09-26 00:00:00 Golf 17448.780113220215 3
2013-07-28 00:00:00 Fitness 84.98000144958496 1
2013-07-28 00:00:00 Outdoors 1773.6400089263916 2
2013-07-28 00:00:00 Footwear 7537.990001678467 3

0 Likes

#6

select order_date , department_name , revenue_per_day, rnk
from
(select order_date , department_name , revenue_per_day,
dense_rank() over (partition by order_date order by revenue_per_day )rnk
from
(select o.order_date , d.department_name , sum(oi.order_item_subtotal) revenue_per_day
from orders o join order_items oi
on o.order_id=oi.order_item_order_id
join products p
on oi.order_item_product_id = p.product_id
join categories c
on p.product_category_id = c.category_id
join departments d
on c.category_department_id= d.department_id
where o.order_status='COMPLETE’
group by o.order_date, d.department_name
)q)q1
where rnk <=3


val appconf = ConfigFactory.load()
val conf = new SparkConf().setAppName(“Retail DataFrame”).setMaster(appconf.getConfig(args(2)).getString(“deployement”))
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val outputPath = args(1)
val revenue=sqlContext.sql("select order_date,department_name,revenue_per_day,denseRank from (select order_date,department_name,revenue_per_day,dense_rank() "
+“over (partition by order_date order by revenue_per_day) as denseRank from (”+
"select o.order_date , d.department_name , sum(oi.order_item_subtotal) revenue_per_day "+
"from orders o join order_items oi "+
"on o.order_id=oi.order_item_order_id "+
"join products p "+
"on oi.order_item_product_id = p.product_id "+
"join categories c "+
"on p.product_category_id = c.category_id "+
"join departments d "+
"on c.category_department_id= d.department_id "+
"where o.order_status=‘COMPLETE’ "+
“group by o.order_date, d.department_name) query1)query2 where denseRank<=3”).rdd.saveAsTextFile(outputPath)

0 Likes

#7

#SparkCode

sqlContext.sql("select order_date,department_name,revenue_per_day,denseRank from (select order_date,department_name,revenue_per_day,dense_rank() "
+“over (partition by order_date order by revenue_per_day) as denseRank from (”+
"select o.order_date , d.department_name , sum(oi.order_item_subtotal) revenue_per_day "+
"from orders o join order_items oi "+
"on o.order_id=oi.order_item_order_id "+
"join products p "+
"on oi.order_item_product_id = p.product_id "+
"join categories c "+
"on p.product_category_id = c.category_id "+
"join departments d "+
"on c.category_dept_id= d.department_id "+
"where o.order_status=‘COMPLETE’ "+
“group by o.order_date, d.department_name) query1)query2 where denseRank<=3”).show()


#Hive Query

select * from (select *,dense_rank() over( partition by order_date order by department_name) as dept_rank
from
(SELECT o.order_date, d.department_name, DOUBLE(sum(oi.order_item_subtotal)) AS revenue
FROM orders o JOIN order_items oi ON o.order_id = oi.order_item_order_id
JOIN products p ON oi.order_item_product_id = p.product_id
JOIN categories c ON c.category_id = p.product_category_id
JOIN departments d ON d.department_id = c.category_dept_id
WHERE o.order_status = 'COMPLETE’
GROUP BY o.order_date, d.department_name
)a)a1 where dept_rank<=3;

0 Likes

#8

Hive Query:

select * from
(select *, dense_rank() OVER (PARTITION BY order_date ORDER BY revenue DESC)rank from
(select o.order_date,d.department_name,sum(oi.order_item_subtotal) revenue
from orders o join order_items oi
on oi.order_item_order_id=o.order_id
join products p
on p.product_id=oi.order_item_product_id
join categories c
on c.category_id=p.product_category_id
join departments d
on d.department_id=c.category_department_id
where o.order_status='COMPLETE’
group by o.order_date,d.department_name)tmp) tmp1
where rank <= 3;

Spark Sql:

sqlContext.sql("select * from (select *,dense_rank() "
+“over (partition by order_date order by revenue_per_day) as denseRank from (”+
"select o.order_date , d.department_name , sum(oi.order_item_subtotal) revenue_per_day "+
"from mahesh007.orders o join mahesh007.order_items oi "+
"on o.order_id=oi.order_item_order_id "+
"join mahesh007.products p "+
"on oi.order_item_product_id = p.product_id "+
"join mahesh007.categories c "+
"on p.product_category_id = c.category_id "+
"join mahesh007.departments d "+
"on c.category_department_id= d.department_id "+
"where o.order_status=‘COMPLETE’ "+
“group by o.order_date, d.department_name) query1)query2 where denseRank<=3”).show()

Output:

±-------------------±--------------±-----------------±--------+
| order_date|department_name| revenue_per_day|denseRank|
±-------------------±--------------±-----------------±--------+
|2014-07-11 00:00:…| Outdoors|349.87000465393066| 1|
|2014-07-11 00:00:…| Footwear|2375.7200050354004| 2|
|2014-07-11 00:00:…| Golf| 2724.750030517578| 3|
|2013-09-02 00:00:…| Fitness| 259.9400005340576| 1|
|2013-09-02 00:00:…| Outdoors| 956.6299915313721| 2|
|2013-09-02 00:00:…| Footwear| 3489.649971008301| 3|
|2014-01-09 00:00:…| Outdoors| 925.7900085449219| 1|
|2014-01-09 00:00:…| Golf| 3294.720027923584| 2|
|2014-01-09 00:00:…| Footwear| 3724.599998474121| 3|
|2014-02-24 00:00:…| Outdoors|1393.6399765014648| 1|
|2014-02-24 00:00:…| Footwear|1729.8300323486328| 2|
|2014-02-24 00:00:…| Golf|3849.7500076293945| 3|
|2013-07-29 00:00:…| Fitness| 319.9800109863281| 1|
|2013-07-29 00:00:…| Outdoors| 1648.479995727539| 2|
|2013-07-29 00:00:…| Footwear| 4644.539966583252| 3|
|2014-02-19 00:00:…| Fitness| 559.870002746582| 1|
|2014-02-19 00:00:…| Outdoors|1065.6900081634521| 2|
|2014-02-19 00:00:…| Footwear| 3765.569953918457| 3|
|2014-01-30 00:00:…| Outdoors| 818.6900043487549| 1|
|2014-01-30 00:00:…| Golf| 4679.700023651123| 2|
±-------------------±--------------±-----------------±--------+

0 Likes

#9

Spark -hive

select * from (select * ,dense_rank() over(PARTITION BY order_date order by revenune desc) as dept_rank from
(select o.order_date,dpt.department_name,DOUBLE(sum(oi.order_item_subtotal)) as revenune from
orders o join order_items oi
on o.order_id =oi.order_item_id
join products pr on oi.order_item_product_id=pr.product_id
join categories cat on cat.category_id =pr.product_category_id
join department dpt on dpt.department_id=cat.category_department_id
where o.order_status='COMPLETE’
group by o.order_date,dpt.department_name)as tmp)tmp1 where dept_rank <=3;

val results=sqlContext.sql(s"select * from (select * ,dense_rank() over(PARTITION BY order_date order by revenune desc) as dept_rank from"
| +"(select o.order_date,dpt.department_name,DOUBLE(sum(oi.order_item_subtotal)) as revenune from"
| +" arun.orders o join arun.order_items oi on o.order_id =oi.order_item_id"
| +" join arun.products pr on oi.order_item_product_id=pr.product_id “
| +” join arun.categories cat on cat.category_id =pr.product_category_id"
| +" join arun.department dpt on dpt.department_id=cat.category_department_id"
| +" where o.order_status=‘COMPLETE’"
| +" group by o.order_date,dpt.department_name)as tmp)tmp1 where dept_rank <=3")

 results.show()
0 Likes

#10

package org.test.sparkDataFrames

import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveContext

import com.typesafe.config.ConfigFactory
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.spark.sql.hive.HiveContext

object SqlHiveEx20 {
def main(args: Array[String]): Unit = {
val appConf=ConfigFactory.load()
val conf = new SparkConf().setAppName(“spark exercise 17”)
.setMaster(appConf.getConfig(args(2)).getString(“deployment”))
val sc = new SparkContext(conf)
val inputPath = args(0)
val outputPath = args(1)
val fs = FileSystem.get(sc.hadoopConfiguration)
val inputPathExists = fs.exists(new Path(inputPath))
val outputPathExists = fs.exists(new Path(outputPath))
if (!inputPathExists) {
println(“Input Path does not exists”)
return
}

if (outputPathExists) {
  fs.delete(new Path(outputPath), true)
}

val sqlContext = new SQLContext(sc)

val hiveContext=new HiveContext(sc)

sqlContext.sql(“use paramesh”)

sqlContext.sql("select order_date,department_name,revenue_per_day,denseRank from (select order_date,department_name,revenue_per_day,dense_rank() "
+“over (partition by order_date order by revenue_per_day) as denseRank from (”+
"select o.order_date , d.department_name , sum(oi.order_item_subtotal) revenue_per_day "+
"from orders o join order_items oi "+
"on o.order_id=oi.order_item_order_id "+
"join products p "+
"on oi.order_item_product_id = p.product_id "+
"join categories c "+
"on p.product_category_id = c.category_id "+
"join departments d "+
"on c.category_department_id= d.department_id "+
"where o.order_status=‘COMPLETE’ "+
“group by o.order_date, d.department_name) query1)query2 where denseRank<=3”)

}
}

0 Likes