# Need some help on Arun's Blog Solutions(CCA 175)

#1

Hi All,
I am preparing for CCA 175 certification in Pyspark. I was trying to solve the exercises from Arun’s blog, in pyspark. but got badly stuck in some issues as it is completely explained in scala.
Is there anyone who knows both scala and python, can help me in this?

That would really be a great help.

Aparna

#2

I would recommend you to practice file format from Arun’s blog. For Spark, you can continue with Durga’s sessions.
Hope this helps.
Thanks
Venkat

#3

Hi Venkat,
Thanks a lot for your reply. Good to hear that you remember my other problem It still persists.
and I am following the videos from itversity only ,but wanted to practice some examples.
Have seen in almost all the successful stories, Arun’s blog has been playing a very vital role in the exam preparation .

saw the use of combineByKey() in his first exercise, which was not explained in the itversity videos.I tried it in pyspark, but somewhere I am wrong not to get the matching values, for one column. Would it be possible for you to help me in this?

Thanks
Aparna

#4

Hi Aparna,

I could be of help, if you would like to post your solution to the problem.

Cheers
Akash

#5

Hi Akash,
Thank you so much for your reply. That’s really so nice of you.
I have tried to solve the problem in two methods and have asked my questions at the end.
Please go through and code and let me know if there is anything wrong/needs to be modified.

Problem Scenario:

Please find total orders and total amount per status per day.
The result should be sorted by order date in descending, order status in ascending and total amount in descending and total orders in ascending.

Solution -
(using spark sql)

ordersDF.registerTempTable(“orders_table”)
order_itemsDF.registerTempTable(“order_items_table”)

sqlContext.sql("select to_date(order_date) order_date,order_status, round(sum(order_item_subtotal),2) Total_amount, count(distinct(order_id)) Total_orders from orders o join order_items oi on o.order_id = oi.order_item_order_id group by order_date, order_status order by order_date desc, order_status,Total_amount desc,Total_orders ").show()

|Order_date| order_status|Total_amount|Total_count|
±---------±--------------±-----------±----------+
|2014-07-24| CANCELED| 1254.92| 2|
|2014-07-24| CLOSED| 16333.16| 26|
|2014-07-24| COMPLETE| 34552.03| 55|
|2014-07-24| ON_HOLD| 1709.74| 4|
|2014-07-24| PAYMENT_REVIEW| 499.95| 1|
|2014-07-24| PENDING| 12729.49| 22|
|2014-07-24|PENDING_PAYMENT| 17680.7| 34|
|2014-07-24| PROCESSING| 9964.74| 17|
|2014-07-24|SUSPECTED_FRAUD| 2351.61| 4|
|2014-07-23| CANCELED| 5777.33| 10|

(using RDD)

ordersMap = ordersDF.map(lambda x: (x[0], (x[1],x[3])))
order_itemsMap = order_itemsDF.map(lambda x: (x[1], x[4]))

JoinedOrders = ordersMap.join(order_itemsMap)
for i in JoinedOrders.take(5): print(i)
(8192, ((1379131200000, u’PENDING_PAYMENT’), 179.97000122070312))
(8192, ((1379131200000, u’PENDING_PAYMENT’), 399.9800109863281))
(36864, ((1394254800000, u’PENDING_PAYMENT’), 200.0))
(36864, ((1394254800000, u’PENDING_PAYMENT’), 100.0))
(36864, ((1394254800000, u’PENDING_PAYMENT’), 129.99000549316406))

JoinedOrdersMap = JoinedOrders.map(lambda x: x[1])
for i in JoinedOrdersMap.take(5): print(i)
((1379131200000, u’PENDING_PAYMENT’), 179.97000122070312)
((1379131200000, u’PENDING_PAYMENT’), 399.9800109863281)
((1394254800000, u’PENDING_PAYMENT’), 200.0)
((1394254800000, u’PENDING_PAYMENT’), 100.0)
((1394254800000, u’PENDING_PAYMENT’), 129.99000549316406)

Total_Order_Count = JoinedOrdersMap.combineByKey(lambda x: (x,1), lambda x, y: (x[0]+y, x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1]))

for i in Total_Order_Count.take(5): print(i)
((1391490000000, u’ON_HOLD’), (3679.4700469970703, 18))
((1400644800000, u’ON_HOLD’), (4669.360048294067, 25))
((1395547200000, u’ON_HOLD’), (6344.520118713379, 29))
((1380772800000, u’PENDING’), (13096.30020904541, 68))
((1387083600000, u’PENDING’), (11918.920240402222, 54))

Total_Order_CountDF = Total_Order_Count.map(lambda x: Row(order_date = x[0][0], order_status = x[0][1],Total_order=x[1][0], Total_Count = x[1][1])).toDF()

Total_Order_CountDF.registerTempTable(“TotalOrders”)

sqlContext.sql(“select to_date(from_unixtime(order_date/1000)) order_date,order_status,round
(Total_order, 2)Total_order ,Total_Count from TotalOrders order by order_date desc, order_status, Total_order desc,Total_Count”).show()

|order_date| order_status|Total_order|Total_Count|
±---------±--------------±----------±----------+
|2014-07-24| CANCELED| 1254.92| 6|
|2014-07-24| CLOSED| 16333.16| 86|
|2014-07-24| COMPLETE| 34552.03| 173|
|2014-07-24| ON_HOLD| 1709.74| 8|
|2014-07-24| PAYMENT_REVIEW| 499.95| 1|

Question 1:
If you see, there is a mismatch in Total_Count column, as count of distinct(order_id) is evaluated in the sql-query, but only count(order_id), in the combineByKey() . How can I achieve count of distinct order_id using combineByKey() ?

Question 2:
In RDD method, combineByKey() is implemented on the JoinedOrdersMap, which is a paired RDD with one field in the value.
i.e ((order_date,order_status),order_item_subtotal)
How to implement combineByKey() for the same purpose when we have two fields in the value.((order_date,order_status), (order_item_subtotal, order_id))

Please let me know if there is any better approach.

Thanks
Aparna