My solutions in pyspark to Arun's Blog questions

pyspark
#1

solution for problem1 in Arun’s Blog using pyspark

step1:

sqoop import --connect jdbc:mysql://quickstart.cloudera/retail_db --username root --password cloudera --table orders --target-dir /user/cloudera/arun/problem1/orders --as-avrodatafile -z --compression-codec snappy

step2:

sqoop import --connect jdbc:mysql://quickstart.cloudera/retail_db --username root --password cloudera --table order_items --target-dir /user/cloudera/arun/problem1/order_items --as-avrodatafile -z --compression-codec snappy

step3:

orders=sqlContext.read.format(“com.databricks.spark.avro”).load("/user/cloudera/arun/problem1/orders")

order_items=sqlContext.read.format(“com.databricks.spark.avro”).load("/user/cloudera/arun/problem1/order_items")

step4:

a) Just by using Data Frames API - here order_date should be YYYY-MM-DD format

from pyspark.sql.functions import*

xjoin=orders.join(order_items,orders.order_id==order_items.order_item_order_id)

dfResult=xjoin.groupBy(to_date(from_unixtime(xjoin.order_date/1000)).alias(‘order_date’),‘order_status’).agg(countDistinct(xjoin.order_id).alias(‘total_orders’),round(sum(xjoin.order_item_subtotal),2).alias(‘total_amount’)).sort([‘order_date’,‘order_status’,‘total_amount’,‘total_orders’],ascending=[0,1,0,1])

b). Using Spark SQL - here order_date should be YYYY-MM-DD format

xjoin.registerTempTable(“prince”)

Sqlresult1=sqlContext.sql(“select to_date(from_unixtime(order_date/1000))as order_date,order_status,count(distinct(order_id)) total_orders,cast(sum(order_item_subtotal)as decimal(10,2)) as total_amount from prince group by order_date,order_status order by order_date desc,order_status,total_orders,total_amount desc”)

c). By using combineByKey function on RDDS – No need of formatting order_date or total_amount

from pyspark.sql.functions import*

rdd1=orders.rdd.map(lambda x: (x[0],(x[1],x[3])))
rdd2=order_items.map(lambda x:(x[1],x[4]))
rddjoin=rdd1.join(rdd2)

rddkey=rddjoin.map(lambda x: ((x[1][0][0],x[1][0][1]),([x[0]],x[1][1])))

#Note:- since we want to count distinct order_id. For that we are using set, to provide order_id as input to set in aggregation, enclose order_id in list (i.e.[order_id] as shown in above). Else python will throw following error, TypeError: ‘int’ object is not iterable

=====> using aggregateByKey:-

initial=(set(),0)
seqop=lambda x,y: (x[0]|set(y[0]),x[1]+y[1])
combop=lambda x,y: (x[0]|set(y[0]),x[1]+y[1])

agg=rddkey.aggregateByKey(initial,seqop,combop)

aggResult=agg.map(lambda x: (x[0][0],x[0][1],len(x[1][0]),x[1][1])).toDF().sort([col(’_1’),col(’_2’),col(’_3’),col(’_4’)],ascending=[0,1,1,0])

aggResult.show()

=====> using combine key:-

combiner=lambda x: (set(x[0]),x[1])
mergeValue=lambda x,y: (x[0]|set(y[0]),x[1]+y[1])
mergeCombiner=lambda x,y: (x[0]|y[0],x[1]+y[1])

comb=rddkey.combineByKey(combiner,mergeValue,mergeCombiner)

combResult=comb.map(lambda x: (x[0][0],x[0][1],len(x[1][0]),x[1][1])).toDF().sort([col(’_1’),col(’_2’),col(’_3’),col(’_4’)],ascending=[0,1,1,0])

combResult.show()

step5:

#Store the result as parquet file into hdfs using gzip compression:

sqlContext.setConf(“spark.sql.parquet.compression.codec”,“gzip”)

dfResult.write.parquet("/user/cloudera/arun/problem1/result4a-gzip")

Sqlresult1.write.parquet("/user/cloudera/arun/problem1/result4b-gzip")

combResult.write.parquet("/user/cloudera/arun/problem1/result4c-gzip")

step6:

#Store the result as parquet file into hdfs using snappy compression:

sqlContext.setConf(“spark.sql.parquet.compression.codec”,“snappy”)

dfResult.write.parquet("/user/cloudera/arun/problem1/result4a-snappy")

Sqlresult1.write.parquet("/user/cloudera/arun/problem1/result4b-snappy")

combResult.write.parquet("/user/cloudera/arun/problem1/result4c-snappy")

step7:

#Store the result as CSV file into hdfs using No compression:

dfResult.map(lambda x: “,”.join(str(i) for i in x)).saveAsTextFile("/user/cloudera/arun/problem1/result4a-csv")

Sqlresult1.map(lambda x: “,”.join(str(i) for i in x)).saveAsTextFile("/user/cloudera/arun/problem1/result4b-csv")

combResult.map(lambda x: “,”.join(str(i) for i in x)).saveAsTextFile("/user/cloudera/arun/problem1/result4c-csv")

step8:

create a mysql table named result and load data from /user/cloudera/problem1/result4a-csv to mysql table named result

logon to mysql in cloudera-Vmware

mysql> create table mydb.result(order_date varchar(50), order_status varchar(50),total_orders int,total_amount float);

sqoop export --connect jdbc:mysql://quickstart.cloudera/mydb --username root --password cloudera --table result --export-dir /user/cloudera/arun/problem1/result4a-csv --input-fields-terminated-by “,”

0 Likes

#2

solution for problem2 in Arun’s Blog using pyspark

step1:

sqoop import --connect jdbc:mysql://quickstart.cloudera/retail_db --username root --password cloudera --table products --target-dir /user/cloudera/arun/products --fields-terminated-by “|”

step2:

hdfs dfs -mkdir /user/cloudera/arun/problem2/
hdfs dfs -mv /user/cloudera/arun/products /user/cloudera/arun/problem2/products

step3:

hdfs dfs -chmod -R 765 /user/cloudera/arun/problem2/products

step4:

a) =====> dataframes api

productRdd=sc.textFile("/user/cloudera/arun/problem2/products")

for i in productRdd.take(10): print i

from pyspark.sql import Row
from pyspark.sql.functions import *

productDF=productRdd.map(lambda x: Row(product_id=int(x.split("|")[0]), product_category_id= int(x.split("|")[1]), product_name=x.split("|")[2], product_price=float(x.split("|")[4]))).toDF()

dfresult=productDF.filter(productDF.product_price<100).groupBy(‘product_category_id’).agg(max(productDF.product_price).alias(‘MaxPrice’),countDistinct(productDF.product_id).alias(‘Totalproducts’), round(avg(productDF.product_price),2).alias(‘AveragePrice’), min(productDF.product_price).alias(‘MinPrice’)).sort(productDF.product_category_id.desc())

b) =====> spark sql

productDF.registerTempTable(“products”)

sqlResult=sqlContext.sql("select product_category_id, max(product_price)MaxPrice,count(distinct(product_id)) Totalproducts,round(avg(product_price),2)AveragePrice, min(product_price)MinPrice from products where product_price<100 group by product_category_id order by product_category_id desc ")

c) =====> RDDs aggregateByKey method

we want only product_category_id,product_id and product_price, hence we need only these elements from rdd. Also we want product_price<100

dataRdd=productRdd.map(lambda x: (int(x.split("|")[1]),(int(x.split("|")[0]),float(x.split("|")[4])))).filter(lambda x: x[1][1]<100)

for max and min we need only product_category_id and product_price

maxRdd=dataRdd.map(lambda x: (x[0],x[1][1])).reduceByKey(lambda a,b:a if a>b else b)

minRdd=dataRdd.map(lambda x: (x[0],x[1][1])).reduceByKey(lambda a,b:a if a<b else b)

count sum --> to calculate average and total orders.Since order_id is already distinct, we can initialise empty list in initial value for count

initial=([],0)
seqop=lambda x,y: (x[0]+[y[0]],x[1]+y[1])
combop=lambda x,y: (x[0]+y[0],x[1]+y[1])

avgRdd=dataRdd.aggregateByKey(initial,seqop,combop).map(lambda (k,v):(k,len(v[0]),v[1])).map(lambda x: (x[0],(x[1],x[2]/x[1])))

#join Rdd by category_id as key to get final result as [product_category_id, MaxPrice,Totalproducts,AveragePrice,MinPrice]

finalrdd= maxRdd.join(avgRdd).join(minRdd).sortByKey(False)

rddResult=finalrdd.map(lambda x:(x[0],x[1][0][0],x[1][0][1][0],x[1][0][1][1],x[1][1]))

step5:

store the result in avro file using snappy compression under these folders respectively
/user/cloudera/problem2/products/result-df
/user/cloudera/problem2/products/result-sql
/user/cloudera/problem2/products/result-rdd

sqlContext.setConf(“spark.sql.avro.compression.codec”,“snappy”)

dfresult.write.format(“com.databricks.spark.avro”).save("/user/cloudera/arun/problem2/products/result-df")

sqlResult.write.format(“com.databricks.spark.avro”).save("/user/cloudera/arun/problem2/products/result-sql")

rddResult.toDF().write.format(“com.databricks.spark.avro”).save("/user/cloudera/arun/problem2/products/result-rdd")

0 Likes

#3

solution for problem4 in Arun’s Blog using pyspark

step1 :

sqoop import --connect jdbc:mysql://quickstart.cloudera/retail_db --username root --password cloudera --table orders --target-dir /user/cloudera/arun/problem5/text --fields-terminated-by “\t” --lines-terminated-by “\n”

step2:

sqoop import --connect jdbc:mysql://quickstart.cloudera/retail_db --username root --password cloudera --table orders --target-dir /user/cloudera/arun/problem5/avro --as-avrodatafile

step3:

sqoop import --connect jdbc:mysql://quickstart.cloudera/retail_db --username root --password cloudera --table orders --target-dir /user/cloudera/arun/problem5/parquet --as-parquetfile

step4:

a)save the data to hdfs using snappy compression as parquet file at /user/cloudera/arun/problem5/parquet-snappy-compress

avrodata=sqlContext.read.format(“com.databricks.spark.avro”).load("/user/cloudera/arun/problem5/avro/")

sqlContext.setConf(“spark.sql.parquet.compression.codec”,“snappy”)

avrodata.write.parquet("/user/cloudera/arun/problem5/parquet-snappy-compress")

b)save the data to hdfs using gzip compression as text file at /user/cloudera/arun/problem5/text-gzip-compress

avrodata.rdd.map(lambda x: “,”.join(str(i) for i in x)).saveAsTextFile("/user/cloudera/arun/problem5/ text-gzip-compress",“org.apache.hadoop.io.compress.GzipCodec”)

c) save the data to hdfs using no compression as sequence file at /user/cloudera/arun/problem5/sequence

avrodata.rdd.map(lambda x: (x[0],",".join(str(i) for i in x))).saveAsSequenceFile("/user/cloudera/arun/problem5/sequence")

d)save the data to hdfs using snappy compression as text file at /user/cloudera/arun/problem5/text-snappy-compress

avrodata.rdd.map(lambda x: “,”.join(str(i) for i in x)).saveAsTextFile("/user/cloudera/arun/problem5/text-snappy-compress",“org.apache.hadoop.io.compress.SnappyCodec”)

step5:

a)save the data to hdfs using no compression as parquet file at /user/cloudera/problem5/parquet-no-compress

parqdata=sqlContext.read.parquet("/user/cloudera/arun/problem5/parquet-snappy-compress")

sqlContext.setConf(“spark.sql.parquet.compression.codec”,“uncompressed”)

parqdata.write.parquet("/user/cloudera/problem5/parquet-no-compress")

b)save the data to hdfs using snappy compression as avro file at /user/cloudera/arun/problem5/avro-snappy

sqlContext.setConf(“spark.sql.avro.compression.codec”,“snappy”)

parqdata.write.format(“com.databricks.spark.avro”).save("/user/cloudera/arun/problem5/avro-snappy")

step6:

a)save the data to hdfs using no compression as json file at /user/cloudera/arun/problem5/json-no-compress

avroSnappyDF=sqlContext.read.format(“com.databricks.spark.avro”).load("/user/cloudera/arun/problem5/avro-snappy")

avroSnappyDF.write.json("/user/cloudera/arun/problem5/json-no-compress")

b)save the data to hdfs using gzip compression as json file at /user/cloudera/arun/problem5/json-gzip

avroSnappyDF.toJSON().saveAsTextFile("/user/cloudera/arun/problem5/json-gzip",“org.apache.hadoop.io.compress.GzipCodec”)

step7:

save the data to as comma separated text using gzip compression at /user/cloudera/arun/problem5/csv-gzip

jsonData=sqlContext.read.json("/user/cloudera/arun/problem5/json-gzip")

#Note below works in pyspark2 as csv format not supported in spark1.6

jsonData.write.csv("/user/cloudera/arun/problem5/csv-gzip",compression=“gzip”)

#To save in spark1.6, convert to RDD and then saveAsTextFile

jsonData.rdd.map(lambda x: “,”.join(str(i) for i in x)).saveAsTextFile("/user/cloudera/arun/problem5/csv-gzip_123",“org.apache.hadoop.io.compress.GzipCodec”)

step8:

seqRdd=sc.sequenceFile("/user/cloudera/arun/problem5/sequence")

convert rdd to dataframe to save in orc format. From sequence file, we need only value in our case. Since key is order_id and value also contains order_id. hence let us remove key

from pyspark.sql import Row

seqDF=seqRdd.map(lambda x: x[1]).map(lambda x: Row(order_id= int(x.split(",")[0]), order_date= x.split(",")[1], order_customer_id= int(x.split(",")[2]), order_status= x.split(",")[3])).toDF()

seqDF.write.orc("/user/cloudera/arun/problem5/orc")

0 Likes

#4

solution for problem6 in Arun’s Blog using pyspark

Step1:

create a hive meta store database named problem6 and import all tables from mysql retail_db database into hive meta store.

hive> create database problem6;

sqoop import-all-tables --connect jdbc:mysql://quickstart.cloudera/retail_db --username root --password cloudera --hive-import --hive-database problem6

step2: On spark shell use data available on meta store as source and perform step 3,4,5 and 6.

step3:Rank products within department by price and order by department ascending and rank descending

===>using dataframes API

pro=sqlContext.read.table(“problem6.products”)
cat=sqlContext.read.table(“problem6.categories”)
dept=sqlContext.read.table(“problem6.departments”)

xjoin=pro.join(cat,pro.product_category_id==cat.category_id).join(dept,dept.department_id==cat.category_department_id).drop(‘category_id’,‘department_id’)

from pyspark.sql.functions import*
from pyspark.sql.window import*

spec= Window.partitionBy(xjoin.category_department_id).orderBy(xjoin.product_price.desc())

pro_rank=xjoin.select(‘product_id’,‘product_name’,‘product_price’,‘department_name’,‘category_department_id’).withColumn(‘Rank’,rank().over(spec)).sort(xjoin.category_department_id,col(‘Rank’).desc())

===>using spark sql

xjoin.registerTempTable(‘pro_cat_dept’)

sql_pro_rank=sqlContext.sql("select product_id,product_name,product_category_id,product_price,category_department_id,department_name, rank() over(partition by category_department_id order by product_price desc) as Rank from pro_cat_dept order by category_department_id asc, Rank desc ")

step 4:

find top 10 customers with most unique product purchases. if more than one customer has the same number of product purchases then the customer with the lowest customer_id will take precedence [this proves you can produce aggregate statistics on joined datasets]

===>using dataframes API

cust=sqlContext.read.table(“problem6.customers”)
orders=sqlContext.read.table(“problem6.orders”)
order_items=sqlContext.read.table(“problem6.order_items”)

yjoin=orders.join(cust,cust.customer_id==orders.order_customer_id).join(order_items,order_items.order_item_order_id==orders.order_id)

top10CustDF=yjoin.groupBy(‘customer_id’,‘customer_fname’,‘customer_lname’).agg(countDistinct(‘order_item_product_id’).alias(‘total_unique_products’)).sort(col(‘total_unique_products’).desc(),‘customer_id’).limit(10)

===>using spark sql

yjoin.registerTempTable(“cust_orders_items”)

top10custSql=sqlContext.sql("select customer_id,customer_fname,customer_lname, count(distinct(order_item_product_id)) as total_unique_products from cust_orders_items group by customer_id,customer_fname,customer_lname order by total_unique_products desc, customer_id limit 10 ")

step 5:

On dataset from step 3, apply filter such that only products less than 100 are extracted.

===>using dataframes API

data=pro_rank.filter(pro_rank.product_price<100)

===>using spark sql

sql_pro_rank_filter=sqlContext.sql(“select product_id,product_name,product_category_id,product_price,category_department_id,department_name, rank() over(partition by category_department_id order by product_price desc) as Rank from pro_cat_dept where product_price<100 order by category_department_id asc, Rank desc” )

step 6:

On dataset from step 4, extract details of products purchased by top 10 customers which are priced at less than 100 USD per unit.

===>using dataframes API

top10join= top10CustDF.join(yjoin, top10CustDF.customer_id==yjoin.order_customer_id).select(top10CustDF.customer_id,top10CustDF.customer_fname,yjoin.order_item_product_id).join(pro,pro.product_id==yjoin.order_item_product_id)

top10_used_productdetails=top10join.filter(pro.product_price<100).select(‘product_id’,‘product_category_id’,‘product_name’,‘product_price’,‘product_image’).dropDuplicates([‘product_id’,‘product_category_id’])

===>using spark sql

yjoin.registerTempTable(“cust_orders_items”)
top10custSql.registerTempTable(“top10Cust”)
pro.registerTempTable(“products”)

top10Sql_ProDetails= sqlContext.sql(“select distinct X.* from products X join cust_orders_items Y on Y.order_item_product_id=X.product_id join top10Cust Z on Z.customer_id= Y.order_customer_id where X.product_price<100”)

step 7:

#Store the result of 5 and 6 in new meta store tables within hive

sql_pro_rank_filter.write.saveAsTable(“problem6.product_rank_result”)

top10Sql_ProDetails.write.saveAsTable(“problem6.top_products”)

0 Likes