CCA175 Advice/Direction

cca-175
apache-spark
certification
cca175

#1

Dear All,

Been trying to confirm a few things regarding CCA175 Certification after reading some posts

  1. Are we allowed to have a monitor connected to our Mac (or PC) during the exam? so 2 screens total…
  • been reading a few posts saying the screen font is very small and that the proctor bar gets in the way of typing in the shell … is this true? having multiple screens would help a lot with navigating and such.
  1. How similar are Arun’s blog questions in relation to the actual exam questions?
  • been struggling a bit with Arun’s questions because my main language is python and it makes it difficult to follow along, compute the correct code, and validate since his answers are written in scala.
  1. Are Durga’s YT playlists the same as the Udemy courses?
  • completed Durga’s CCA175 python playlist and feel very confident in understanding and replicating his examples, but not so much with Arun’s questions as you can see in bullet #2

Anyone able to offer any additional insight? I have used Spark MLlib quite a bit and have more experience with Spark 2.x (DFs, UDFs, etc) compared to Spark 1.x (RDDs) however there aren’t any Spark certifications with a Data Science focus thus I have been preparing for CCA175 a few months so far (off and on due to work schedule) … I am at the point I am considering taking a couple weeks to complete Durga’s CCA175 scala playlist to get a better feel for scala language and syntax

Again, if anyone is able to provide some responses/advice to my questions I greatly appreciate it!

thank you


#2

Just FYI. I am yet to take my exam but here is my understanding

  1. You are allowed to connect external monitor but you can only use one monitor ( external or built-in).

  2. Actual exam is much easier than Arun’s blog.


#3

I asked this question to my exam monitor, and he allowed me to connect another monitor. So in short I was using both the screens (laptop and external monitor), however it is not very helpful as you can’t extend the programs from remote desktop to another screen.


#4

Yes, I believe so. I haven’t done Arun’s blog questions so I don’t know their difficulty level.


#5

I am still preparing for certification based on pyspark. Being a data scientist, I do not suggest you to learn scala. Just focus on solving Arun’s questions using pyspark. Except the 3rd question (in Arun’s blog), I felt comfortable solving the other questions. The 3rd question deals with extracting schema from avro file formats. Let me know if you want me to post my pyspark solutions to Arun’s questions.

Durga’s CCA175 in pyspark (in udemy) course is excellent, and it should give you enough knowledge to pass the certification. Focus more on file formats (see Arun’s blog).

Good luck
Sekhar


#6

thank you very much for the guidance! posting your Arun questions in pyspark would be much appreciated! I completed Durga’s YT pyspark playlist and feel comfortable with the material (have been told YT playlist is same as Udemy course) … I plan to use mostly DF on exam instead of RDD so hopefully this approach works.


#7

Some of my notes (important points are given below). I hope this helps for folks who are preparing for CCA175 based on pyspark:

##Reading/writing/compressing files:

#To read any text file (compressed / uncompressed into an RDD)
rdd = sc.textFile("…path")

#To write any text file to HDFS:
rdd.saveAsTextFile("…path",compressionClassCodec=“org.apache…”)
By default compressionClassCodec=None. This means no compression.

See the /etc/hadoop/conf/core-site.xml and search for codec to find available compression codec classes.

#To read any sequence file (compressed / uncompressed into an RDD)
rdd = sc.sequenceFile("…path")

#To write an RDD into a sequence file:
rdd.saveAsSequenceFile("…path",compressionCodecClass=“org.apache…”)

#To read a parquet file (compressed/uncompressed):
df = sqlContext.read.parquet("…path")

#To read a orc file (compressed/uncompressed):
df = sqlContext.read.orc("…path")

#To read a json file (compressed/uncompressed):
df = sqlContext.read.json("…path")

#To read a text file (compressed/uncompressed) to a data frame:
df = sqlContext.read.text("…path")

#To read an avro file (compressed/uncompressed):
df = sqlContext.load("…path",“com.databricks.spark.avro”)

#To write data frame in parquet, avro, json, orc with/without compression

################################
#Set the parquet compression #
################################
sqlContext.setConf(“spark.sql.parquet.compression.codec”,“gzip”)
#In place of gzip, use “snappy”, “lzo”, “uncompressed” or “gzip”

#######################################
#Write the file now in parquet format #
#######################################
df.write.parquet("…path")

#######################
#Set avro compression #
#######################
sqlContext.setConf(“spark.sql.avro.compression.codec”,“snappy”)
#Can be “snappy”, “deflate” or “uncompressed”

###########################
#Write in avro format now #
###########################
df.save("…path",“com.databricks.spark.avro”)

#########################
#Set orc compression ##
#########################
#I think by default the data is compressed by snappy format for ORC
df.write.orc("…path")

##########################################
#Write the JSON file without compression #
##########################################
#Without compression #
##########################################
df.toJSON().saveAsTextFile("…path")

####################
#With compression #
####################
df.toJSON().saveAsTextFile("…path", compressionClassCodec=“org…”)

################################################
#Write RDD and DF with desired number of partitions ##
################################################

#For RDD
rdd.coalesce(2).saveAsTextFile("…path",compressionCodecClass="…")

#For DF
final_df.rdd.coalesce(2).toDF().write.parquet(“problem1”,mode=“overwrite”)

#DF write mode:
df.write.parquet(“problem1”,mode=“overwrite”,mode=“overwrite”)
#mode can be “overwrite” or “append” or “error”

##Some exercises (Not Arun’s blog exercises. Will post those solutions later):

#Read the data from /public/retail_db/orders into an RDD
rdd1 = sc.textFile("/public/retail_db/orders")

#Save the RDD as text file with snappy compression
rdd1.saveAsTextFile(“orders_snappy/”,compressionCodecClass=“org.apache.hadoop.io.compress.SnappyCodec”)

#Read the snappy compressed data to another data frame
#First read to a data frame
rdd2 = sc.textFile(“orders_snappy/*”)
df = rdd2.map(lambda x: x.split(",")).map(lambda x: (int(x[0]),x[1],int(x[2]),x[3])).toDF([“orderID”,“orderTime”,“custId”,“Status”])

#Save the data as a sequence file with compression (snappy)
#To save as sequence file, you must have a key-value pair RDD.
rdd2.keyBy(lambda x: int(x[0])).saveAsSequenceFile(“orders_sequence/”,compressionCodecClass=“org.apache.hadoop.io.compress.SnappyCodec”)

#Read the data back from sequence file into another RDD
rdd3 = sc.sequenceFile(“orders_sequence/”)

#Create orders data frame
orders_df = rdd1.map(lambda x: x.split(",")).map(lambda x: (int(x[0]),x[1],int(x[2]),x[3])).toDF([“orderId”,“orderTime”,“custID”,“orderStatus”])

#Save the file as avro with snappy compression
sqlContext.setConf(“spark.sql.avro.compression.codec”,“snappy”)

orders_df.save(“orders_avro/”,“com.databricks.spark.avro”)

#Read back the saved data frame again
df_new = sqlContext.load(“orders_avro/”,“com.databricks.spark.avro”)

#Create a hive table referencing the avro file (compressed)
#Only ORC can be compressed.

use sekhar_db;

create table orders_avro(order_id int, order_time timestamp, cust_id int, order_status string) stored as AVRO;

orders_df.registerTempTable(“orders_df”)

sqlContext.sql(""“INSERT into sekhar_db.orders select * from orders_df”"")

#Query the hive table
SELECT * from sekhar_db.orders_avro


#8

##Arun blog problems and solutions:
##http://arun-teaches-u-tech.blogspot.com/p/cca-175-prep-problem-scenario-1.html
########################
##Problem-1’s solution #
########################

NOTE: These are not exhaustive solutions. Arun asked us to use RDD, DF and SQL to solve these. But I provided only one solution. Also these were implemented on itversity labs, and hence the input/output directories will be different from the ones asked in the question. Lastly, I tested these solutions but I am not sure if these are the correct/optimal solutions. So use at your own risk.

1a. Using sqoop, import orders table into hdfs to folders /user/cloudera/problem1/orders. File should be loaded as Avro File and use snappy compression

sqoop import
–connect jdbc:mysql://ms.itversity.com/retail_db
–username retail_user
–password itversity
–table orders
–delete-target-dir
–target-dir orders
–as-avrodatafile
–compress
–compression-codec=“org.apache.hadoop.io.compress.SnappyCodec”

##Verify your work by comparing the number of imported records with the actual count

sqoop eval
–connect jdbc:mysql://ms.itversity.com/retail_db
–username retail_user
–password itversity
–query “select count(*) from orders”

1b. Using sqoop, import order_items table into hdfs to folders /user/cloudera/problem1/order-items. Files should be loaded as avro file and use snappy compression

sqoop import
–connect jdbc:mysql://ms.itversity.com/retail_db
–username retail_user
–password itversity
–table order_items
–target-dir order_items
–delete-target-dir
–as-avrodatafile
–compress
–compression-codec=“org.apache.hadoop.io.compress.SnappyCodec”

sqoop eval
–connect jdbc:mysql://ms.itversity.com/retail_db
–username retail_user
–password itversity
–query “select count(*) from order_items”

  1. Using Spark Scala load data at /user/cloudera/problem1/orders and /user/cloudera/problem1/orders-items items as dataframes.
    orders_df = sqlContext.load(“orders”,“com.databricks.spark.avro”)

orders_df.show(3) # will display:
±-------±------------±----------------±--------------+
|order_id| order_date|order_customer_id| order_status|
±-------±------------±----------------±--------------+
| 1|1374724800000| 11599| CLOSED|
| 2|1374724800000| 256|PENDING_PAYMENT|
| 3|1374724800000| 12111| COMPLETE|
±-------±------------±----------------±--------------+

order_items = sqlContext.load(“order_items”,“com.databricks.spark.avro”)

order_items.show(3) #will display:
±------------±------------------±--------------------±------------------±------------------±-----------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|order_item_product_price|
±------------±------------------±--------------------±------------------±------------------±-----------------------+
| 1| 1| 957| 1| 299.98| 299.98|
| 2| 2| 1073| 1| 199.99| 199.99|
| 3| 2| 502| 5| 250.0| 50.0|
±------------±------------------±--------------------±------------------±------------------±-----------------------+

Expected Intermediate Result: Order_Date , Order_status, total_orders, total_amount. In plain english, please find total orders and total amount per status per day. The result should be sorted by order date in descending, order status in ascending and total amount in descending and total orders in ascending. Aggregation should be done using below methods. However, sorting can be done using a dataframe or RDD. Perform aggregation in each of the following ways
a). Just by using Data Frames API - here order_date should be YYYY-MM-DD format

orders_df.registerTempTable(“orders_df”)
order_items.registerTempTable(“order_items_df”)

##In the following statement, we need to divide the unixtimestamp value with 1000, to avoid the inclusion of micro seconds

final_df = sqlContext.sql("""
SELECT from_unixtime(a.order_date/1000, ‘yyyy-MM-dd’) as Order_Date,
a.order_status Order_status,
count(*) as total_orders,
sum(b.order_item_subtotal) as total_amount
from orders_df a, order_items_df b
where a.order_id = b.order_item_order_id
group by Order_Date, Order_status
order by Order_Date asc, total_amount desc
“”")

final_df.show(5)

  1. Using Spark Scala load data at /user/cloudera/problem1/orders and /user/cloudera/problem1/orders-items items as dataframes.
    orders_df = sqlContext.load(“orders”,“com.databricks.spark.avro”)

orders_df.show(3) # will display:
±-------±------------±----------------±--------------+
|order_id| order_date|order_customer_id| order_status|
±-------±------------±----------------±--------------+
| 1|1374724800000| 11599| CLOSED|
| 2|1374724800000| 256|PENDING_PAYMENT|
| 3|1374724800000| 12111| COMPLETE|
±-------±------------±----------------±--------------+

order_items = sqlContext.load(“order_items”,“com.databricks.spark.avro”)

order_items.show(3) #will display:
±------------±------------------±--------------------±------------------±------------------±-----------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|order_item_product_price|
±------------±------------------±--------------------±------------------±------------------±-----------------------+
| 1| 1| 957| 1| 299.98| 299.98|
| 2| 2| 1073| 1| 199.99| 199.99|
| 3| 2| 502| 5| 250.0| 50.0|
±------------±------------------±--------------------±------------------±------------------±-----------------------+

Expected Intermediate Result: Order_Date , Order_status, total_orders, total_amount. In plain english, please find total orders and total amount per status per day. The result should be sorted by order date in descending, order status in ascending and total amount in descending and total orders in ascending. Aggregation should be done using below methods. However, sorting can be done using a dataframe or RDD. Perform aggregation in each of the following ways
a). Just by using Data Frames API - here order_date should be YYYY-MM-DD format

orders_df.registerTempTable(“orders_df”)
order_items.registerTempTable(“order_items_df”)

##In the following statement, we need to divide the unixtimestamp value with 1000, to avoid the inclusion of micro seconds

final_df = sqlContext.sql("""
SELECT from_unixtime(a.order_date/1000, ‘yyyy-MM-dd’) as Order_Date,
a.order_status Order_status,
count(*) as total_orders,
sum(b.order_item_subtotal) as total_amount
from orders_df a, order_items_df b
where a.order_id = b.order_item_order_id
group by Order_Date, Order_status
order by Order_Date asc, total_amount desc
“”")

final_df.show(5)

b). Using Spark SQL - here order_date should be YYYY-MM-DD format
c). By using combineByKey function on RDDS – No need of formatting order_date or total_amount

Store the result as parquet file into hdfs using gzip compression under folder
/user/cloudera/problem1/result4a-gzip
/user/cloudera/problem1/result4b-gzip
/user/cloudera/problem1/result4c-gzip

##Answer:
sqlContext.setConf(“spark.sql.parquet.compression.codec”,“gzip”)
final_df.rdd.coalesce(3).toDF().write.parquet(“problem1”,mode=“overwrite”)

Store the result as parquet file into hdfs using snappy compression under folder
/user/cloudera/problem1/result4a-snappy
/user/cloudera/problem1/result4b-snappy
/user/cloudera/problem1/result4c-snappy
##Answer
sqlContext.setConf(“spark.sql.parquet.compression.codec”,“snappy”)
final_df.rdd.coalesce(3).toDF().write.parquet(“problem1”,mode=“append”)

Store the result as CSV file into hdfs using No compression under folder
/user/cloudera/problem1/result4a-csv
/user/cloudera/problem1/result4b-csv
/user/cloudera/problem1/result4c-csv

##Answer:
rdd_final = final_df.rdd.map(lambda x: “,”.join([x.Order_Date,x.Order_status,str(x.total_orders),str(x.total_amount)]))

#You cannot save the above RDD to problem1 folder, and I am not sure how to append to problem1 files. So saving to problem2 directory and then move them to problem1

rdd_final.coalesce(3).saveAsTextFile(“problem2”,compressionCodecClass=None)

hdfs dfs -mv problem2/* problem1/

create a mysql table named result and load data from /user/cloudera/problem1/result4a-csv to mysql table named result

sqoop list-databases
–connect jdbc:mysql://ms.itversity.com
–username retail_user
–password itversity \

sqoop eval
–connect jdbc:mysql://ms.itversity.com/retail_export
–username retail_user
–password itversity
–query “CREATE TABLE retail_export.sekhar_orders_1(order_date date, order_status varchar(20), total_orders bigint, total_amount double)”

sqoop export
–connect jdbc:mysql://ms.itversity.com/retail_export
–username retail_user
–password itversity
–table sekhar_orders_1
–export-dir “/user/sekharmekala/problem1/part-0*”

sqoop eval
–connect jdbc:mysql://ms.itversity.com/retail_export
–username retail_user
–password itversity
–query “select * from sekhar_orders_1 limit 10”


#9

this is great! thanks a lot … currently working through these … have you only completed problem 1 via pyspark?


#10

Yes, we will come up with solutions for others as well.