If you can solve these problems.. you may be ready for CCA-175 . Give it a shot!

cca-175

#1

I have gained so much from Durga Udemy (coupons here)/itversity labs and discussion forums/Arun Blogs
So I wanted to give something back in return to this community for other folks who are preparing for the exam
Hope you find this useful. If any typo or correction needed in the questions please add it to comment. I will fix it.

In my opinion if you can solve these then you have reached a comfort level of giving the exam.


Background
#Use itversity labs tables (orders,orderitems,products)
#Use Scala/Spark RDD/Spark SQL/DF-DataFrame/Hive SQL
#Some functions you need to be familiar with to solve these
reduceByKey
sortByKey
groupByKey
aggregateByKey
#Learn/Remember these calls
spark-shell --master yarn --packages com.databricks:spark-avro_2.10:2.0.1 --conf spark.ui.port=xxxxxx
sqlContext.setConf(“spark.sql.avro.compression.codec”,“snappy”)
sqlContext.setConf(“spark.sql.parquet.compression.codec”,“gzip”) (or snappy)
sqlContext.setConf(“spark.sql.xxxxx.compression.codec”,“uncompressed”) --> very important to know how to reset. if not you can open new spark shell for every problem to be on the safer side so that you dont mix up compression/no compression

someDF.registerTempTable(“temptablename”)
sqlContext.sql(“select * from temptablename”).show
var someDF=sqlContext.sql(“select * from temptablename”)
someDF.write.avro / someDF.write.parquet
someDF.saveAsTable(“hivetablename”)

var hc = new org.apache.spark.sql.hive.HiveContext(sc);
var someDF=hc.sql(“select xxxxxx dbname.tablename”)

To handle output as sequence… RDD(K,V) . K and V should be string

#You can solve some of these problems either using RDD/DF functions or using spark sql
My way of doing is… if only one table involved or involves 2 tables with simple join – Do it in RDD/DF/Function/join/leftouterjoin
If 2 or more tables involved with complex joins. convert DFs to temp tables and do the joins with sql and save the output
++++++++++++++++++++++++++++++++

Problem 1 :
Import orders table from mysql (db: retail_db , user : retail_user , password : xxxx)
Import only records that are in “COMPLETE” status
Import all columns other than customer id
Save the imported data as text and tab delimitted in this hdfs location /user/yourusername/jay/problem1/

Problem 2
Import orders table from mysql (db: retail_db , user : retail_user , password : xxxx)
Import all records and columns from Orders table
Save the imported data as text and tab delimitted in this hdfs location /user/yourusername/jay/problem2/

Problem 3 :
Export orders data into mysql
Input Source : /user/yourusername/jay/problem2/
Target Table : Mysql . DB = retail_export . Table Name : jay__mock_orders
Reason for somealias in table name is … to not overwrite others in mysql db in labs

Problem 4 :
Read data from hive and perform transformation and save it back in HDFS
Read table populated from Problem 3 (jay__mock_orders )
Produce output in this format (2 fields) , sort by order count in descending and save it as avro with snappy compression in hdfs location /user/yourusername/jay/problem4/avro-snappy
ORDER_STATUS : ORDER_COUNT
COMPLETE 54
CANCELLED 89
INPROGRESS 23

Save above output in avro snappy compression in avro format in hdfs location /user/yourusername/jay/problem4/avro

Problem 5 :

Import orders table from mysql (db: retail_db , user : retail_user , password : xxxx)
Import all records and columns from Orders table
Save the imported data as avro and snappy compression in hdfs location /user/yourusername/jay/problem5-avro-snappy/

Read above hdfs data
Consider orders only in “COMPLETE” status and order id between 1000 and 50000 (1001 to 49999)
Save the output (only 2 columns orderid and orderstatus) in parquet format with gzip compression in location /user/yourusername/jay/problem5-parquet-gzip/
Advance : Try if you can save output only in 2 files (Tip : use coalesce(2))

Problem 6 :

Import orders table from mysql (db: retail_db , user : retail_user , password : xxxx)
Import all records and columns from Orders table
Save the imported data as text and tab delimitted in this hdfs location /user/yourusername/jay/problem6/orders/

Import order_items table from mysql (db: retail_db , user : retail_user , password : xxxx)
Import all records and columns from Order_items table
Save the imported data as text and tab delimitted in this hdfs location /user/yourusername/jay/problem6/order-items/

Read orders data from above HDFS location
Read order items data form above HDFS location
Produce output in this format (price and total should be treated as decimals)
Consider only CLOSED & COMPLETE orders
ORDER_ID ORDER_ITEM_ID PRODUCT_PRICE ORDER_SUBTOTAL ORDER_TOTAL

Note : ORDER_TOTAL = combined total price for this order

Save above output as ORC in hive table “jay_mock_orderdetails”
(Tip : Try saving into hive table from DF directly without explicit table creation manually)

Note : This problem updated on Jun 4 with more details to reduce ambiguity based on received feedback/comments from users. (Thank You )

Problem 7:

Import order_items table from mysql (db: retail_db , user : retail_user , password : xxxx)
Import all records and columns from Order_items table
Save the imported data as parquet in this hdfs location /user/yourusername/jay/problem7/order-items/

Import products table from mysql (db: retail_db , user : retail_user , password : xxxx)
Import all records and columns from products table
Save the imported data as avro in this hdfs location /user/yourusername/jay/problem7/products/

Read above orderitems and products from HDFS location
Produce this output :

ORDER_ITEM_ORDER_ID PRODUCT_ID PRODUCT_NAME PRODUCT_PRICE ORDER_SUBTOTAL

Save above output as avro snappy in hdfs location /user/yourusername/jay/problem7/output-avro-snappy/

Note : This problem updated on Jun 4 with more details to reduce ambiguity based on received feedback/comments from users. (Thank You )

Problem 8

Read order item from /user/yourusername/jay/problem7/order-items/
Read products from /user/yourusername/jay/problem7/products/

Produce output that shows product id and total no. of orders for each product id.
Output should be in this format… sorted by order count descending
If any product id has no order then order count for that product id should be “0”

PRODUCT_ID PRODUCT_PRICE ORDER_COUNT

Output should be saved as sequence file with Key=ProductID , Value = PRODUCT_ID|PRODUCT_PRICE|ORDER_COUNT (pipe separated)

Problem 9

Import orders table from mysql (db: retail_db , user : retail_user , password : xxxx)
Import all records and columns from Orders table
Save the imported data as avro in this hdfs location /user/yourusername/jay/problem9/orders-avro/

Read above Avro orders data
Convert to JSON
Save JSON text file in hdfs location /user/yourusername/jay/problem9/orders-json/

Read json data from /user/yourusername/jay/problem9/orders-json/
Consider only “COMPLETE” orders.
Save orderid and order status (just 2 columns) as JSON text file in location /user/yourusername/jay/problem9/orders-mini-json/

Updated on Jun 9 (File formats quick review sheet attached to this blog)


Failed the exam - this is why - please read for your benefit
Successfully cleared CCA175 hadoop exam - May 25
Here are arunsBlogs and jaysQuestions solutions via PYSPARK api!
Cleared CCA-175 on Aug-14-2018
#2

id, fname, lname, address
1,kanna,rakesh,6,35george st
2,kan,rak,31,3 may st
2,jimmy,jim,594,blaxland

there are only 4 columns with “,” delimiter, but there is “,” in address column as well. how do you split this data. pls help me


#3

In pyspark:

s = ["1,kanna,rakesh,6,35george st", "2,kan,rak,31,3 may st", "2,jimmy,jim,594,blaxland"]

rdd1 = sc.parallelize(s)

def mysplit(x):
     x_split = x.split(",")
     return x_split[0], x_split[1], x_split[2], ",".join(x_split[3:])

rdd1.map(mysplit).collect()

#will show:
[('1', 'kanna', 'rakesh', '6,35george st'), ('2', 'kan', 'rak', '31,3 may st'), ('2', 'jimmy', 'jim', '594,blaxland')]

I hope this helps.


#4

Great work Jay. Congratulations. Thanks so much for sharing your inputs generously.

Best
Sekhar


#5

Hi Jay,
Many Congratulations on your success!! Really great work by you!!!
I tried your exercises, they are really interesting.

  1. I have a doubt in exercise 6, I was not able to create the table in hive in orc format from dataframe.

Please have a check of my code.

resultDF = sqlContext.sql(“select order_id,order_item_id,order_item_product_price, order_item_subtotal,count(1) order_total from order_items oi join orders o on o.order_id = oi.order_item_order_id group by order_id,order_item_id,order_item_product_price,order_item_subtotal”)

resultDF.registerTempTable(“result”)

sqlContext.sql(“create table aparna_mock_orderdetails as select * from result”)

How can I create a tbale in orc format in hive?

2.One small correction I think, should be in exercise 7,
In the output as mentioned,
ORDER_ID PRODUCT_ID PRODUCT_PRICE ORDER_SUBTOTAL

I think ORDER_ID should be ORDER_ITEM_ID /ORDER_ITEM_ORDER_ID .

Please reply.

Thanks in advance
Aparna


#6

I believe you can create ORC format hive table from Data Frame itself, you may try:

resultDF.write.format(“orc”).saveAsTable(hiveTable)

I have tried before. It worked. Let me know it works in this example or not.

I am also trying these questions, Can you post your answers, Just wanted to verify.

I have an exam on 9th June, 2018.

Thanks,
Meghal


#7

Hi Meghal,
Thank you so much. I will try with this and let you know.
I too wanted to validate my answers , I will do them again n post it here.(as I haven’t saved the solutions).
Wishing you all the best for your exam. I am too planning to take it shortly.

Thanks
Aparna


#8

Hi Meghal,
Kindly find my solutions below.
I have tried to solve all in sparkSql as I am more comfortable in this.
Please check and let me know if there is any modification/correction needed.

I still have a doubt in exercise 6, its count of what?? as per my query, it counts the order_id and shows 1 for each row.(which I think is wrong)

Problem 1.

sqoop import
–connect jdbc:mysql://ms.itversity.com:3306/retail_db
–username retail_user
–password itversity
–table orders
–columns order_id,order_date,order_status
–where "order_status like ‘COMPLETE’ "
–target-dir /user/aparna149/aparna/problem1
–as-textfile
–fields-terminated-by ‘\t’

Problem 2.

sqoop import
–connect jdbc:mysql://ms.itversity.com:3306/retail_db
–username retail_user
–password itversity
–table orders
–target-dir /user/aparna149/aparna/problem2
–fields-terminated-by ‘\t’
–as-textfile

Problem 3.

mysql
create table aparna_mock_orders as select * from retail_db.orders ;
truncate aparna_mock_orders;

sqoop export
–connect jdbc:mysql://ms.itversity.com:3306/retail_export
–username retail_user
–password itversity
–table aparna_mock_orders
–export-dir /user/aparna149/aparna/problem2
–input-fields-terminated-by ‘\t’

Problem 4.

sqoop import
–connect jdbc:mysql://ms.itversity.com:3306/retail_export
–username retail_user
–password itversity
–query "select order_status,count(1) order_count from aparna_mock_orders where $CONDITIONS group by order_status order by order_count desc "
–as-avrodatafile
–compress
–compression-codec snappy
–target-dir /user/aparna149/aparna/problem4/avro-snappy
-m 1

Problem 5.

sqoop import
–connect jdbc:mysql://ms.itversity.com:3306/retail_db
–username retail_user
–password itversity
–table orders
–as-avrodatafile
–compress
–compression-codec snappy
–target-dir /user/aparna149/aparna/problem5-avro-snappy/

orders= sqlContext.read.format(“com.databricks.spark.avro”).load("/user/aparna149/aparna/problem5-avro-snappy")
orders.registerTempTable(“orders”)
result = sqlContext.sql(“select order_id,order_status from orders where order_status like ‘COMPLETE’ and order_id > 1000 and order_id < 50000”)
sqlContext.setConf(“spark.sql.parquet.compression.codec”,“gzip”)
result.coalesce(2).write.parquet("/user/aparna149/aparna/problem5-parquet-gzip")

Problem 6.

sqoop import
–connect jdbc:mysql://ms.itversity.com:3306/retail_db
–username retail_user
–password itversity
–table orders
–as-textfile
–fields-terminated-by ‘\t’
–target-dir /user/aparna149/aparna/problem6/orders/

sqoop import
–connect jdbc:mysql://ms.itversity.com:3306/retail_db
–username retail_user
–password itversity
–table order_items
–as-textfile
–fields-terminated-by ‘\t’
–target-dir /user/aparna149/aparna/problem6/order_items/

orders = sc.textFile("/user/aparna149/aparna/problem6/orders/")
ordersDF=orders.map(lambda x: x.split("\t")).map(lambda x: Row(order_id =int(x[0]),order_status = x[3])).toDF()
ordersDF.registerTempTable(“orders”)
order_items = sc.textFile("/user/aparna149/aparna/problem6/order_items")
order_itemsDF = order_items.map(lambda x: x.split("\t")).map(lambda x: Row(order_item_id =int(x[0]),order_item_order_id=int(x[1]),order_item_subtotal=float(x[4]),order_item_product_price=float(x[5]))).toDF()
order_itemsDF.registerTempTable(“order_items”)

result = sqlContext.sql(“select order_id, order_item_id,cast (order_item_product_price as decimal (10,2)),cast(order_item_subtotal as decimal(10,2)),sum(order_item_subtotal) order_total from order_items oi join orders o on oi.order_item_order_id = o.order_id where order_status in (‘CLOSED’,‘COMPLETE’) group by order_id, order_item_id, order_item_product_price, order_item_subtotal”)

(## I am not sure what should be in the query to evaluate order_total as order_item_subtotal is already been asked in the query. )

sqlContext.sql(“use aparna”)
result.write.format(“orc”).saveAsTable(“aparna_mock_orderdetails”)

Exercise 7.

sqoop import
–connect jdbc:mysql://ms.itversity.com:3306/retail_db
–username retail_user
–password itversity
–table order_items
–as-parquetfile
–target-dir /user/aparna149/aparna/problem7/order_items/

sqoop import
–connect jdbc:mysql://ms.itversity.com:3306/retail_db
–username retail_user
–password itversity
–table products --as-avrodatafile
–target-dir /user/aparna149/aparna/problem7/products/

order_items = sqlContext.read.parquet("/user/aparna149/aparna/problem7/order_items")
order_items.registerTempTable(“order_items”)

products =sqlContext.read.format(“com.databricks.spark.avro”).load("/user/aparna149/aparna/problem7/products/")
products.registerTempTable(“products”)

result = sqlContext.sql(“select order_item_id,product_id,cast(product_price as decimal),cast(order_item_subtotal as decimal(10,2)) from products p join order_items oi on p.product_id = oi.order_item_product_id “)
sqlContext.setConf(“spark.sql.avro.compression.codec”,“snappy”)
result.write.format(“com.databricks.spark.avro”).save(”/user/aparna149/aparna/problem7/output-avro-snappy/”)

Exercise 8

order_items = sqlContext.read.parquet("/user/aparna149/aparna/problem7/order_items")
order_items.registerTempTable(“order_items”)

products =sqlContext.read.format(“com.databricks.spark.avro”).load("/user/aparna149/aparna/problem7/products/")
products.registerTempTable(“products”)

result =sqlContext.sql(“select product_id,product_price,count(order_item_id) order_count from products p left outer join order_items oi on p.product_id = oi.order_item_product_id group by product_id,product_price “)
resultMap = result.map(lambda x: (x[0], (str(x[0])+”|”+str(x[1])+"|"+str(x[2]))))
resultMap.saveAsSequenceFile("/user/aparna149/aparna/problem8/orders_sequence")

Exercise 9

sqoop import
–connect jdbc:mysql://ms.itversity.com:3306/retail_db
–username retail_user
–password itversity
–table orders
–as-avrodatafile
–target-dir /user/aparna149/aparna/problem9/orders-avro/

orders= sqlContext.read.format(“com.databricks.spark.avro”).load("/user/aparna149/aparna/problem9/orders-avro/")
orders.write.json("/user/aparna149/aparna/problem9/orders-json/")
ojson=sqlContext.read.json("/user/aparna149/aparna/problem9/orders-json/")
ojson.registerTempTable(“ojson”)
result = sqlContext.sql(“select order_id, order_status from ojson where order_status like ‘COMPLETE’ “)
result.write.json(”/user/aparna149/aparna/problem9/orders-mini-json/”)

Thanks
Aparna


Cleared CCA 175 on Sep 21st
#9

Hi @AparnaSen ,

Your answers are perfect. Below are the few points that i believe needs to be considered.

-> In question 4, first sentence is asking about hive data and transformation. I think it can be done via creating HiveContext.

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)  // where sc is Spark Context. 
val testTable = hiveContext.sql("select * from hiveTable")

-> For question 5, I came across similar kind of question, asking for compression with deflate level 5. see below.
sqlContext.setConf(“spark.sql.avro.comression.codec”, “deflate”)
sqlContext.setConf(’“spark.sql.avro.deflate.level”, “5”)

-> For question 6, (after importing):
orders = sc.textFile("/user/cloudera/Problem_6/orders")
order_items = sc.textFile("/user/cloudera/Problem_6/order_items")

ordersDF = orders.map(lambda x: x.split("\t")).map (lambda x: Row(order_id = int(x[0]), order_customer_id = int(x[2])) ).toDF()

orderItemsDF = order_items.map(lambda x: x.split("\t")).map(lambda x: Row(order_item_id = int(x[0]), order_item_order_id = int(x[1]), order_item_subtotal = float(x[4]), order_item_product_price = float(x[5]) )).toDF()

 //Performed join without sql via join() function. 
  dfJoined = ordersDF.join(orderItemsDF, orderItemsDF.order_item_order_id == ordersDF.order_id)

dfJoined.registerTempTable("orderRevenue")

result1 = sqlContext.sql("""select order_id, order_item_id, cast(order_item_product_price as decimal) , cast(order_item_subtotal as decimal), count(2) order_total from orderRevenue group by order_id, order_item_id, order_item_product_price, order_item_subtotal""")

I also get the same result as yours - last column count as 1 in all. But that’s expected from this query too. Because this query is only possible with group by of these all columns. And as per expected output, all group by columns are required.

I think they might ask order-total (in terms of price) per customer. Then we can just do group by order_id and use sum(order_subtotal) to get the result. Even I have seen this kind of question before.

Apart from that, remaining answers looks correct.

Thanks.
Meghal Gandhi


#11

What is the difference between Problem 1 and 2?


#12

never mind, I see that soln of 2 is being used in 3.


#13

[quote=“AparnaSen, post:8, topic:12529”]

Problem 4.

I tried and it gives me error…what i am missing ,i tried with by using where $CONDITIONS too

[piyushpatel@gw03 ~]$ sqoop import \

–connect jdbc:mysql://ms.itversity.com:3306/retail_db
–username retail_user --password itversity
–query “select order_status, count(1) order_count from piyush_mock_orders $CONDITIONS group by order_status order by order_count desc”
–split-by order_id
–compress --compression-codec snappy
–target-dir user/piyushpatel/piyush/problem4/avro-snappy
–as-avrodatafile
-m 2
Warning: /usr/hdp/2.5.0.0-1245/accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
18/06/02 19:21:47 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6.2.5.0.0-1245
18/06/02 19:21:47 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
18/06/02 19:21:47 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
18/06/02 19:21:47 INFO tool.CodeGenTool: Beginning code generation
18/06/02 19:21:48 INFO manager.SqlManager: Executing SQL statement: select order_status, count(1) order_count from piyush_mock_orders (1 = 0) group by order_status order by order_count desc
18/06/02 19:21:48 ERROR manager.SqlManager: Error executing statement: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near ‘(1 = 0) group by order_status order by order_count desc’ at line 1
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near ‘(1 = 0) group by order_status order by order_count desc’ at line 1


#14

Hi,
Please try $CONDITIONS by escaping with backslash, in the query. as it never shows here in the post.
Let me know if it helps.

Thanks
Aparna


#15

Thanks apara, it works with $CONDITIONS

sqoop import
–connect jdbc:mysql://ms.itversity.com:3306/retail_export
–username retail_user
–password itversity
–query “select order_status,count(1) order_count from piyush_mock_orders where $CONDITIONS group by order_status order by order_count desc”
–as-avrodatafile
–compress
–compression-codec snappy
–target-dir /user/piyushpatel/piyush/problem4/avro-snappy
–m 1


#16

Hi Jay,
When I am casting price and total as decimals i.e. cast (order_item_product_price as decimal), it is not showing any data after the decimal point. Are we supposed to cast these values as decimals or use toFloat?

Thank you
-Gaurav


#17

@AparnaSen @meghal1911 - I think the Order Total is not count but rather order_item_product_price*order_item_quantity.


#18

Hi Gaurav,
Yes it could be, I am not even sure what it actually asks for. :frowning_face:

Thanks
Aparna


#19

Please go through this link.This may help.

https://stackoverflow.com/questions/49826400/pyspark-how-to-split-when-there-are-several-delimiters-in-one-column


#20

This is regarding the 8th question solution.

I have tried your solution but I did not find any records with product_id’s with zero orders (count).

I have modified the solution like this “p.product_id, p.product_price, count(oi.order_item_id)” then I could see the product_id’s with zero orders.

Now my actual questions is which solution is correct, and please provide the explanation for that.


#21

Hi Rajesh,
Yes you are right, Thank you very much for bringing up the error.
It should be count(order_item_id)

I will correct it right away.

Thanks
Aparna