Exercise 06 - Get details of top 5 customers by revenue for each month

pyspark
spark-shell
apache-spark
scala
python

#1

Before attempting these questions, make sure you prepare by going through appropriate material.


Here are the Udemy coupons for our certification courses. Our coupons include 1 month lab access as well.

  • Click here for $35 coupon for CCA 175 Spark and Hadoop Developer using Python.
  • Click here for $35 coupon for CCA 175 Spark and Hadoop Developer using Scala.
  • Click here for $25 coupon for HDPCD:Spark using Python.
  • Click here for $25 coupon for HDPCD:Spark using Scala.
  • Click here for access to state of the art 13 node Hadoop and Spark Cluster

Duration: 20 to 30 minutes

  • Tables should be in hive database - <YOUR_USER_ID>_retail_db_txt
    • orders
    • order_items
    • customers
  • Time to create database and tables need not be counted. Make sure to go back to Spark SQL module and create tables and load data
  • Get details of top 5 customers by revenue for each month
  • We need to get all the details of the customer along with month and revenue per month
  • Data need to be sorted by month in ascending order and revenue per month in descending order
  • Create table top5_customers_per_month in <YOUR_USER_ID>_retail_db_txt
  • Insert the output into the newly created table

#2

val orders=sc.textFile("/user/cloudera/test/orders")
val ordersDF=orders.map(rec=(rec.split(",")(0).toInt,rec.split(",")(1).toString,rec.split(",")(2).toInt,rec.split(",")(3).toString)).toDF(“order_id”,“order_date”,“order_customer_id”,“order_status”)
ordersDF.registerTempTable(“orders1”)
val orderItems=sc.textFile("/user/cloudera/test/order_items")
val orderItemsDF=orderItems.map(rec=>(rec.split(",")(0).toInt,rec.split(",")(1).toInt,rec.split(",")(2).toInt,rec.split(",")(3).toInt,rec.split(",")(4).toFloat,rec.split(",")(5).toFloat)).toDF(“order_item_id”,“order_item_order_id”,“order_item_product_id”,“order_item_quantity”,“order_item_subtotal”,“order_item_product_price”)
orderItemsDF.registerTempTable(“orderItems1”)
val customers=sc.textFile("/user/cloudera/test/customers")
val customersDF=customers.map(rec=>(rec.split(",")(0).toInt,rec.split(",")(1).toString,rec.split(",")(2).toString,rec.split(",")(3).toString,rec.split(",")(4).toString,rec.split(",")(5).toString,rec.split(",")(6).toString,rec.split(",")(7).toString)).toDF(“customer_id”,“customer_fname”,“customer_lname”,“customer_email”,“customer_password”,“customer_street”,“customer_city”,“customer_state”,“customer_zipcode”)

customersDF.registerTempTable("customers1")

sqlContext.sql("select * from(select customer_id,customer_fname,customer_lname,customer_email,customer_password,customer_street,customer_city,customer_state,customer_zipcode,month,total_amount_per_month,dense_rank() over (partition by month order by total_amount_per_month desc) as rnk from (select distinct * from(select c.customer_id,c.customer_fname,c.customer_lname,c.customer_email,c.customer_password,c.customer_street,c.customer_city,c.customer_state,c.customer_zipcode,substr(o.order_date,1,7) month,sum(order_item_subtotal) total_amount_per_month from customers1 c join orders1 o on o.order_customer_id=c.customer_id join orderItems1 oi on o.order_id=oi.order_item_order_id group by c.customer_id,c.customer_id,customer_fname,customer_lname,customer_email,customer_password,customer_street,customer_city,customer_state,customer_zipcode,substr(o.order_date,1,7))q)p)e where rnk <=5 order by month,total_amount_per_month desc").show

#3

please let me know if i did in wrong way


#4

I was able to achieve the above using dataframe and sql operations, but can the above be achieved using core spark api’s as well, if yes please give some pointers around extracting month from order date and applying rank function.


#5

SparkSQL:

sqlContext.sql(“create database imthi_exercise”)

sqlContext.sql(“use imthi_exercise”)

sqlContext.sql(“create table orders (order_id int, order_date varchar(40), order_cust_id int, order_status varchar(40)) row format delimited fields terminated by ‘,’ stored as textfile”)

sqlContext.sql(“create table order_items(order_item_id int, order_item_order_id int, product_id int,product_qty int, order_item_subtotal float,order_item_price float) row format delimited fields terminated by ‘,’ stored as textfile”)

sqlContext.sql(“create table customer (cust_id int, cust_fname varchar(40), cust_lname varchar(40), cust_email varchar(40),cust_password varchar(40),cust_street varchar(40),cust_city varchar(40),cust_state varchar(40),cust_zip varchar(40)) row format delimited fields terminated by ‘,’ stored as textfile”)

sqlContext.sql(“show tables”)

sqlContext.sql(“load data local inpath ‘/data/retail_db/orders/part-00000’ into table orders”)

sqlContext.sql(“load data local inpath ‘/data/retail_db/order_items’ into table order_items”)

sqlContext.sql(“load data local inpath ‘/data/retail_db/customers’ into table customer”)

sqlContext.sql(“select * from customer limit 10”).show

val final_output = sqlContext.sql("select c.*,order_date,revenue from(select order_cust_id, revenue, order_date, (dense_rank() over(partition by order_date order by revenue desc)) as rank from (select round(sum(order_item_subtotal),2) revenue, substr(order_date,1,7) as order_date, order_cust_id from orders inner join order_items on order_id = order_item_order_id group by substr(order_date,1,7) , order_cust_id)a)b inner join customer c on order_cust_id = cust_id where rank <= 5 order by order_date,revenue desc ")

final_output.saveAsTable(“top5_customers_per_month”)

sqlContext.setConf(“spark.sql.shuffle.partitions”,“2”)

sqlContext.sql(“select * from top5_customers_per_month”).show(10)

final_output.coalesce(4).saveAsTable(“top5_customers_per_month”)

SparkAPI :

val orderRDD = sc.textFile("/public/retail_db/orders")
val order_req = orderRDD.map(a => (a.split(",")(0).toInt,(a.split(",")(1).substring(0,7).replace("-","").toInt,a.split(",")(2).toInt)))

val order_itemRDD = sc.textFile("/public/retail_db/order_items")
val order_item_req = order_itemRDD.map(a => (a.split(",")(1).toInt,a.split(",")(4).toFloat))

val customerRDD = sc.textFile("/public/retail_db/customers")
val customer_req = customerRDD.map(a => (a.split(",")(0).toInt,a))

val order_order_item_join = order_req.join(order_item_req)

val sum_total = order_order_item_join.map(e => e.2).reduceByKey(+_).map(e => (e._1._1,(e._1._2,e._2)))

val date_group = sum_total.groupByKey()

def getTopNcustomer(a: (Int, Iterable[(Int, Float)]), topn: Int) = {
val x = a._2.toList.sortBy(o => -o._2)
val high_total = x.map(t => t._2).distinct.take(topn).min
val final_customer = x.takeWhile(q => q._2.toFloat >=high_total)
final_customer
}

val filter_cus = date_group.flatMap(e => getTopNcustomer(e, 5).map(a => (e._1,a)))

val rearrange = filter_cus.map(c => (c._2._1,(c._2._2,c._1)))

val customer_join = rearrange.join(customer_req).map(c => c._2).sortBy(o => (o._1._2,-o._1._1))

val final_cust = customer_join.map(c => c._2 + “,” + c._1._2 +","+c._1._1)


#6

sqlContext.sql(“select * from (select rank() over(partition by order_date order by total_revenue desc) as rank,total_revenue,order_date,customer_id from(select sum(oi.order_subtotal) as total_revenue,o.order_date,o.customer_id from order_items oi join orders o on o.order_id = oi.order_item_order_id group by o.order_date,o.customer_id order by order_date,total_revenue desc)a)a where a.rank < 6”).show(200)


#7

Is below solution correct?
Even if we use RDD then in the end to get rank (i.e. get top 5), we need to convert it into DF

df = h.sql(“select substr(o.order_date,0,7) as month, o.order_customer_id, sum(oi.order_item_subtotal) as revenue from retail_db.orders o inner join retail_db.order_items oi on o.order_id = oi.order_item_order_id group by substr(o.order_date,0,7), o.order_customer_id”)

df.registerTempTable(“revenue”)

rankedDf = h.sql(“select c.*, r.month, r.revenue, rank() over (partition by r.month order by r.revenue desc) as rank from revenue r inner join retail_db.customers c on r.order_customer_id = c.customer_id order by r.month, r.revenue desc”)

rankedDf.registerTempTable(“rankedRevenue”)

resultDf = h.sql(“select * from rankedRevenue where rank < 6 order by month, rank”)

finalDf = resultDf.drop(resultDf.rank)

finalDf.saveAsTable(“retail_db.top5_customers_per_month”)


#8

Can anyone provide the code in python for this exercise?


#9

In Pyspark - using Spark SQL & DataFrames:

results_DF = sqlContext.sql("select q.month, q.customer_id, q.revenue_per_month_per_customer from \
(select customer_id, cast(concat(substr(order_date, 1, 4), substr(order_date, 6, 2)) as int) month, \
sum(order_item_subtotal) revenue_per_month_per_customer, \
rank() over (partition by cast(concat(substr(order_date, 1, 4), substr(order_date, 6, 2)) as int) order by sum(order_item_subtotal) desc) rank \
from orders join order_items \
on order_id = order_item_order_id \
join customers \
on order_customer_id = customer_id \
group by cast(concat(substr(order_date, 1, 4), substr(order_date, 6, 2)) as int), customer_id \
order by month asc, revenue_per_month_per_customer desc) q \
where q.rank <=5")

results_DF.registerTempTable("results_DF")

solution_DF = sqlContext.sql("select rdf.month, rdf.revenue_per_month_per_customer, c.* from results_DF rdf join customers c on rdf.customer_id = c.customer_id")

solution_DF.registerTempTable("solution_DF")

sqlContext.sql("create table top5_customers_per_month \
as \
SELECT * from solution_DF")

sqlContext.sql("select * from top5_customers_per_month").show()

#10

@sneha_ananthan Can you provide me the code using pyspark RDD?


#11

Sure, getting to the solution is lengthier in this approach though.

In pyspark - using RDD and Spark Core APIs:

orders = sc.textFile("/apps/hive/warehouse/snehaananthan_retail_db_txt.db/orders")
order_items = sc.textFile("/apps/hive/warehouse/snehaananthan_retail_db_txt.db/order_items")
customers = sc.textFile("/apps/hive/warehouse/snehaananthan_retail_db_txt.db/customers")

for i in customers.take(10): print(i)

ordersMap = orders. \
map(lambda o: 
	(int(o.split(",")[0]), (o.split(",")[1][:7].replace('-', ''), int(o.split(",")[2])))
	)

orderItemsMap = order_items. \
map(lambda oi: 
	(int(oi.split(",")[1]), float(oi.split(",")[4]))
	)

customersMap = customers. \
map(lambda c: 
	(int(c.split(",")[0]), 
		(int(c.split(",")[0]), 
			c.split(",")[1], 
			c.split(",")[2], 
			c.split(",")[3], 
			c.split(",")[4], 
			c.split(",")[5], 
			c.split(",")[6], 
			c.split(",")[7], 
			c.split(",")[8] 
		)
	)
)

ordersJoin = ordersMap. \
join(orderItemsMap). \
map(lambda (k, v): v)
for i in ordersJoin.take(10): print(i)

from operator import add
revenuePerDatePerCustomer = ordersJoin. \
reduceByKey(add)
for i in revenuePerDatePerCustomer.take(10): print(i)

customersJoin = revenuePerDatePerCustomer. \
map(lambda (k, v): 
	(k[1], (k[0], v))
	). \
join(customersMap)
for i in customersJoin.take(10): print(i)

finalMapSortedByMonth = customersJoin. \
map(lambda (k, v): 
	(v[0][0], (v[0][0], v[0][1], 
				v[1][0], v[1][1], v[1][2], v[1][3], v[1][4], v[1][5], v[1][6], v[1][7], v[1][8]))
	). \
sortByKey()
for i in finalMapSortedByMonth.take(10): print(i)


top5CustomersByRevenuePerMonth = finalMapSortedByMonth. \
groupByKey(). \
flatMap(lambda (k, v): 
	sorted(list(v), key=(lambda l: float(l[1])), reverse=True)[:5]
	)

for i in top5CustomersByRevenuePerMonth.take(10): print(i)

from pyspark.sql import Row
top5CustomersByRevenuePerMonth_DF = top5CustomersByRevenuePerMonth. \
map(lambda r: 
	Row(month=r[0], revenue_per_month=float(r[1]), 
		customer_id=int(r[2]), 
		customer_fname=r[3], 
		customer_lname=r[4], 
		customer_email=r[5], 
		customer_password=r[6], 
		customer_street=r[7],
		customer_city=r[8], 
		customer_state=r[9], 
		customer_zipcode=r[10])
	).toDF()

top5CustomersByRevenuePerMonth_DF.registerTempTable("top5CustomersByRevenuePerMonth_DF")

sqlContext.sql("create table top5_customers_per_month \
as \
SELECT * from top5CustomersByRevenuePerMonth_DF")

sqlContext.sql("select * from top5_customers_per_month").show()

#12

cust = open(’/data/retail_db/customers/part-00000’).read().splitlines()
custRdd = sc.parallelize(cust)

custDF = custRdd.map(lambda x: Row(
cust_id= int(x.split(’,’)[0])
,fname= x.split(’,’)[1],
lname=x.split(’,’)[2],
email=x.split(’,’)[3],
ssn=x.split(’,’)[4],
city=x.split(’,’)[5],
state=x.split(’,’)[6],
zip=x.split(’,’)[7]
)
).toDF()

custDF.registerTempTable(‘customers’)

order = open(’/data/retail_db/orders/part-00000’).read().splitlines()
orderRdd = sc.parallelize(order)

orderDF = orderRdd.map(lambda x: Row
(order_id =int(x.split(’,’)[0])
,order_date= x.split(’,’)[1]
, customer_id= int(x.split(’,’)[2])
,order_status = x.split(’,’)[3] )).toDF()

order_items = open(’/data/retail_db/order_items/part-00000’).read().splitlines()
orderItemsRdd =sc.parallelize(order_items)

orderitemsDF = orderItemsRdd.map(lambda x: Row(order_item_id=x.split(’,’)[0],
order_id = int(x.split(’,’)[1]),
product_id = int(x.split(’,’)[2])
,quantity = x.split(’,’)[3],
subtotal=float(x.split(’,’)[4]),
product_price = x.split(’,’)[5])).toDF()

df3 =sqlContext.sql(’
select q.monthyear, q.cust_id, q.total,
q.rank, d.fname, d.lname, d.city,
d.state, d.zip,
d.email, d.ssn
from
(
select c.cust_id,
date_format(to_date(o.order_date), “YYYYMM”) monthyear,
round(sum(oi.subtotal),2) total,
dense_rank() over (partition by date_format(to_date(o.order_date), “YYYYMM”) order by round(sum(oi.subtotal),2) desc) rank
from customers
c join orders o on
c.cust_id = o.customer_id
join orderitems oi
on oi.order_id =o.order_id
group by
c.cust_id,
date_format(to_date(o.order_date), “YYYYMM”)
) q
join customers d
on q.cust_id = d.cust_id
where rank<=5
order by monthyear,
total desc
')

sqlContext.sql(’
create table reachnetha.topcustomers
(
monthyear int,
cust_id int,
total float,
rank int, fname string,
lname string,
city string,
state string,
zip string,
email string,
ssn string
) stored as orc
')


#13

Do we get such questions in the exam where in you have to use dense_rank() ? Of the 2 hive questions, is this something we should expect?

Looks very difficult for a person is not so well versed in SQL though


#14

SparkSQL and DF.

orders = sc.textFile("/apps/hive/warehouse/…")
order_items = sc.textFile("/apps/hive/warehouse/…")
customers = sc.textFile("/apps/hive/warehouse/…")

from pyspark.sql import Row
ordersDF = orders.
map(lambda o: Row(order_id=int(o.split(",")[0]),ordered_date=int(o.split(",")[1].split(" “)[0].replace(”-","")[:6]),
order_customer_id = int(o.split(",")[2]))).
toDF()

ordersDF.registerTempTable(“r_orders”)

sqlContext.sql(“select * from r_orders limit 10”).show()

order_itemsDF = order_items.
map(lambda oi: Row(order_item_order_id=int(oi.split(",")[1]),order_item_subtotal=float(oi.split(",")[4]))).
toDF()

order_itemsDF.registerTempTable(“t_order_items”)

sqlContext.sql(“select * from t_order_items limit 10”).show()

order_reveue_df=sqlContext.sql(“select order_item_order_id, round(sum(order_item_subtotal),2) as order_revenue from t_order_items group by order_item_order_id order by order_item_order_id”)

order_reveue_df.registerTempTable(“r_order_items”)

customersDF = customers.map(lambda c: Row(customer_id=int(c.split(",")[0]),customer_fname=c.split(",")[1],
customer_lname=c.split(",")[2],customer_email=c.split(",")[3],customer_password=c.split(",")[4],
customer_street=c.split(",")[5],customer_city=c.split(",")[6],customer_state=c.split(",")[7],customer_zipcode=c.split(",")[7])).
toDF()

customersDF.registerTempTable(“r_customers”)

resultDF = sqlContext.
sql(“SELECT b.ordered_date,
b.total_revenue_per_month,
b.rev_rank,
c.customer_id,
c.customer_fname,
c.customer_lname
FROM (SELECT a.order_customer_id,
a.ordered_date,
a.total_revenue_per_month,
DENSE_RANK () OVER (PARTITION BY a.ordered_date ORDER BY a.total_revenue_per_month)
AS rev_rank
FROM ( SELECT o.order_customer_id, o.ordered_date, SUM (oi.order_revenue) AS total_revenue_per_month
FROM r_orders o JOIN r_order_items oi ON o.order_id = oi.order_item_order_id
GROUP BY o.order_customer_id, o.ordered_date) a) b
JOIN r_customers c ON b.order_customer_id = c.customer_id
WHERE b.rev_rank <= 5
ORDER BY b.ordered_date, b.total_revenue_per_month DESC”)

resultDF.registerTempTable(“final_result”)

sqlContext.sql(“create table top5_customers_per_month as select ordered_date,total_revenue_per_month,customer_id, customer_fname,customer_lname from final_result”);


#15

Hi Imthiyas,

When I am using your code for ingesting the data directly from unix path to hive table by writing the query :-
sqlContext.sql(“load data local inpath ‘/data/retail_db/orders/part-00000’ into table orders”)
the data is getting inserted but with the NULL values. The count is also correct but data is not getting inserted.
Can you please tell me where I am going wrong ?


#16
//Solution to Problem 6 - Using Spark API, DataFrames, and Spark SQL
//Total timetaken: 35-40 mins (Most of the time I spent on splitting data in ordersDF variable)

//create database
sqlContext.sql("create database nvs_retail_db_txt")

val stg_orders = ("/user/maria_dev/retail_db/orders/")

//Splitting the line based on the comma separator AND splitting DATE based on the "-" (dash) sign
val ordersDF = stg_orders.map(x => 
{val s = x.split(",") 
(s(0).toInt, 
s(1).toString, 
{s(1).split("-")(0) + "" + s(1).split("-")(1)},
s(2).toInt, 
s(3).toString) 
}).filter(x => x._5 == "CLOSED" || x._5 == "COMPLETE")
.toDF("order_id","order_date","order_month","customer_id","order_status")

//no need to create an ORC or Text Input based table first
ordersDF.saveAsTable("nvs_retail_db_txt.orders")

val stg_orderitems = sc.textFile("/user/maria_dev/retail_db/order_items/")
val orderitemsDF = stg_orderitems.map(x => (x.split(",")(1).toInt, x.split(",")(4).toFloat))
.toDF("order_items_order_id","item_revenue")

orderitemsDF.saveAsTable("nvs_retail_db_txt.order_items")

val stg_cust = sc.textFile("/user/maria_dev/retail_db/customers/")
val custDF = stg_cust.map(x => (x.split(",")(0).toInt, x.split(",")(1).toString, x.split(",")(2).toString))
.toDF("cust_id","cust_firstname","cust_lastname")

custDF.saveAsTable("nvs_retail_db_txt.customers")

val cust_revenue_df = sqlContext.sql("select * from (select o.order_month, c.cust_firstname, c.cust_lastname, " + 
"round(sum(oi.item_revenue),2) cust_revenue_by_month, " + 
"rank() over (partition by o.order_month order by sum(oi.item_revenue) desc) cust_rank " + 
"from orders o join order_items oi on o.order_id = oi.order_items_order_id " +
"join customers c on o.customer_id = c.cust_id " + 
"group by o.order_month, c.cust_firstname, c.cust_lastname " + 
"distribute by 1 sort by 1, 4 desc) q where cust_rank <= 5")

cust_revenue_df.saveAsTable("nvs_retail_db_txt.top5_customers_per_month")

#17

Using @sneha_ananthan efficient query:

Part 1 - Load into Hive

============================

HDFS prepare

hadoop fs -mkdir /user/root/retail_db
cd /home/root/data-master
hadoop fs -put retail_db/* /user/root/retail_db

Connect to hive

beeline -u jdbc:hive2://localhost:10000/default -n root -p xxxxxx

Create a databases

create database root_retail_db_txt;
use root_retail_db_txt;

PREPARE AND LOAD TABLES

Orders

DROP TABLE IF EXISTS orders;
CREATE EXTERNAL TABLE orders (
order_id INT,
order_date TIMESTAMP,
order_customer_id INT,
order_status STRING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’
LINES TERMINATED BY ‘\n’
LOCATION ‘/user/root/retail_db/orders/’;

Order_items

DROP TABLE IF EXISTS order_items;
CREATE EXTERNAL TABLE order_items (
order_item_id INT,
order_item_order_id INT,
order_item_product_id INT,
order_item_quantity INT,
order_item_subtotal FLOAT,
order_item_product_price FLOAT
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’
LINES TERMINATED BY ‘\n’
LOCATION ‘/user/root/retail_db/order_items/’;

Customers

DROP TABLE IF EXISTS customers;
CREATE TABLE customers (
customer_id INT,
customer_fname STRING,
customer_lname STRING,
customer_email STRING,
customer_password STRING,
customer_street STRING,
customer_city STRING,
customer_state STRING,
customer_zipcode INT
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’
LINES TERMINATED BY ‘\n’
LOCATION ‘/user/root/retail_db/customers/’;

Part 2 - Load with spark

==============================
pyspark --master yarn
–conf spark.ui.port=12890
–num-executors 2
–executor-memory 512M

results_DF = sqlContext.sql(“select q.month, q.customer_id, q.revenue_per_month_per_customer from
(select customer_id, cast(concat(substr(order_date, 1, 4), substr(order_date, 6, 2)) as int) month,
sum(order_item_subtotal) revenue_per_month_per_customer,
rank() over (partition by cast(concat(substr(order_date, 1, 4), substr(order_date, 6, 2)) as int) order by sum(order_item_subtotal) desc) rank
from root_retail_db_txt.orders join root_retail_db_txt.order_items
on order_id = order_item_order_id
join root_retail_db_txt.customers
on order_customer_id = customer_id
group by cast(concat(substr(order_date, 1, 4), substr(order_date, 6, 2)) as int), customer_id
order by month asc, revenue_per_month_per_customer desc) q
where q.rank <=5”)

results_DF.registerTempTable(“results_DF”)

solution_DF = sqlContext.sql(“select rdf.month, rdf.revenue_per_month_per_customer, c.* from results_DF rdf join root_retail_db_txt.customers c on rdf.customer_id = c.customer_id”)

solution_DF.registerTempTable(“solution_DF”)

sqlContext.sql(“DROP TABLE IF EXISTS root_retail_db_txt.top5_customers_per_month”)
sqlContext.sql(“create table root_retail_db_txt.top5_customers_per_month
as
SELECT * from solution_DF”)

sqlContext.sql(“select * from root_retail_db_txt.top5_customers_per_month”).show()