If you can solve these problems.. you may be ready for CCA-175 . Give it a shot!


Note : Avro/Snappy does not suffix “.snappy” in the output file name.
One way to find out if compression really worked is… try outputting file with and without compression in 2 different directories… . you will see the file size difference.
My knowledge is gzip does not have any effect on Avro. Only snappy used on Avro.




val order_items = sqlContext.read.parquet("/user/princefayaz84/jay/problem7/order_items")

import com.databricks.spark.avro._;
val products = sqlContext.read.avro("/user/princefayaz84/jay/problem7/products")

val product_tab = sqlContext.sql("select p.product_id,p.product_price,count(order_item_id) order_count "+
"from products p "+
"left outer join order_items oi "+
"on p.product_id = oi.order_item_product_id "+
"group by p.product_id,p.product_price "+
“order by order_count desc”

product_tab.rdd.map(rec => rec.mkString("|")).map(rec => (rec.split("|")(0).toString,rec)).coalesce(1).saveAsSequenceFile("/user/princefayaz84/jay/problem8/tab_seq_file_tab")

hadoop fs -cat /user/princefayaz84/jay/problem8/tab_seq_file_tab/part-00000 |head -n 5

SEQorg.apache.hadoop.io.Text org.apache.hadoop.io.Text

val seqfile = sc.sequenceFile("/user/princefayaz84/jay/problem8/tab_seq_file_tab",classOf[org.apache.hadoop.io.Text],classOf[org.apache.hadoop.io.Text])


Hi All

am able to save as sequence file …but when i read from spark-shell using seqfile.take(10).foreach(println) , it throws the below error

ERROR TaskSetManager: Task 0.0 in stage 61.0 (TID 1276) had a not serializable result: org.apache.hadoop.io.Text
Serialization stack:
- object not serializable (class: org.apache.hadoop.io.Text, value: 3)
- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
- object (class scala.Tuple2, (3,304|299.99|0))
- element of array (index: 0)



For Problem 6, I believe we need to find out the order_total (combined total price for this order). When you group by all; order_id, order_item_id, order_item_product_price, order_item_subtotal we still get the same value. Instead use below query

select oi.order_item_order_id ORDER_ID, order_item_id, order_item_product_price product_price, order_item_subtotal order_subtotal, order_total
from order_items oi,
(select order_item_order_id, cast(sum(order_item_subtotal) as decimal(10,2)) order_total from order_items group by order_item_order_id) k,
orders o
where k.order_item_order_id = oi.order_item_order_id and
o.order_id = oi.order_item_order_id and
o.order_status in (‘CLOSED’, ‘COMPLETE’);

OR IF we are using HIVE, we can generate order_total by partition by

select o.order_id, oi.order_item_id, oi.order_tem_product_price,
round(oi.order_tem_subtotal, 2) order_subtotal,
round(sum(oi.order_tem_subtotal) over (partition by o.order_id), 2) order_total
from orders o join order_items oi
on o.order_id = oi.order_item_order_id
where o.order_status in (‘COMPLETE’, ‘CLOSED’)

Correct me if i am wrong.



Hi Guys ,
I was trying the solve the problems mentioned here . My concern is while I was trying to use coalesce to limit the number of output file while saving snappy compressed avro files spark shell gets struck and I see lot of exception and retries happening . Did anyone face this kind of problem. Is coalesce not supported with snappy and avro?

Thanks in advance




For prob 6 , asked order_total = total price for particular order so group by column should be only order_id

below should be approach :

select a.o_id ORDER_ID, b.o_i_id ORDER_ITEM_ID, cast(b.p_price as decimal(10,2)) PRODUCT_PRICE, cast(b.total as decimal(10,2)) ORDER_SUBTOTAL, sum(cast(b.total as decimal(10,2))) over (partition by a.o_id ) ORDER_TOTAL from ord a,item b where a.o_id=b.o_i_id and order_status in (‘CLOSED’,‘COMPLETE’)



here is my solution for Problem6

sqoop import
–connect “jdbc:mysql://quickstart.cloudera:3306/retail_db”
–username “retail_dba”
–password “cloudera”
–table orders
–target-dir /user/cloudera/jay/problem6/orders/
–fields-terminated-by ‘\t’

sqoop import
–connect “jdbc:mysql://quickstart.cloudera:3306/retail_db”
–username “retail_dba”
–password “cloudera”
–table order_items
–target-dir /user/cloudera/jay/problem6/order-items/
–fields-terminated-by ‘\t’

var ordersData = sc.textFile("/user/cloudera/jay/problem6/orders")
var ordersitemsData = sc.textFile("/user/cloudera/jay/problem6/order-items/")

var ordersMap = ordersData.

var orderitemsMap = ordersitemsData.



var resultData = sqlContext.sql({"""
create table default.jay_mock_orderdetails stored as orc as
o.order_id as ORDER_ID
,oi.order_item_id as ORDER_ITEM_ID
,oi.order_item_product_price as PRODUCT_PRICE
,cast(oi.order_item_subtotal as decimal(10,2)) as ORDER_SUBTOTAL
,count(1) as ORDER_TOTAL
from orders o
inner join order_items oi
on o.order_id=oi.order_item_order_id
where order_status in (‘CLOSED’,‘COMPLETE’)
group by o.order_id,oi.order_item_id, oi.order_item_product_price, oi.order_item_subtotal



Hello, Very good post.
It helped me a lot to crack the exam.

Thank you.



For problem 6, grouping by order_item_id doesn’t make sense, as it’s the primary key for order_item_table. if we need the below fields in the output then we should join the table twice.


1)Filter order for closed and complete, join with order_item and calculate the sum by grouping by order_id.
2)Join the above result with order_item again, so you will have all fields from order_item and order_total from the first join

or use Window functions to calculate the sum of orders.



hello, thank you for this great resource, it is helping me a lot in my preparation for the CCA175 examination. I have a general question for problem 8.

Is it possible to do this question without using outer join in SparkSQL, I am joining the to DF and doing an sql query on the joined DF and selecting product_id, product_price and count(order_item_Id). Would this way work?

This is my entire solution : joined.sqlContext.sql(" select product_id, product_price, count(order_item_id) order_count from joined_table group by product_id, product_price order by order_count desc")

Thank you again,



Problem 9:

Your solution is correct, this is just another solution to consider…

  1. read avro data into spark
  2. convert avro data to json, write into HDFS location
  3. read the json data that you just converted from avro from HDFS back into Spark
    4) use dataframe api to transform data
  4. write data back into HDFS in json format

val filtered = ojson.where(" order_status = ‘COMPLETE’")

Although spark sql is more robust than dataframe api, dataframe api comes in handy when doing simple transformations.

Best regards!