Problem scenario 1 Arun's Blog - pyspark

pyspark
certification
#1

Hi everybody,

could someone share his solutionabout combineByKey for Scenario 1 from Arun’s Blog?

Thank you

0 Likes

#2

from pyspark.sql.functions import*

orders=sqlContext.read.format(“com.databricks.spark.avro”).load("/user/cloudera/arun/problem1/orders")

order_items=sqlContext.read.format(“com.databricks.spark.avro”).load("/user/cloudera/arun/problem1/order_items")

rdd1=orders.rdd.map(lambda x: (x[0],(x[1],x[3])))
rdd2=order_items.map(lambda x:(x[1],x[4]))
rddjoin=rdd1.join(rdd2)

rddkey=rddjoin.map(lambda x: ((x[1][0][0],x[1][0][1]),([x[0]],x[1][1])))

“”" Note:- since we want to count distinct order_id. For that we are using set, to provide order_id as input to set in aggregation, enclose order_id in list (i.e.[order_id] as shown in above). Else python will throw following error, TypeError: ‘int’ object is not iterable “”"

using aggregateByKey:-

initial=(set(),0)
seqop=lambda x,y: (x[0]|set(y[0]),x[1]+y[1])
combop=lambda x,y: (x[0]|set(y[0]),x[1]+y[1])

agg=rddkey.aggregateByKey(initial,seqop,combop)

aggResult=agg.map(lambda x: (x[0][0],x[0][1],len(x[1][0]),x[1][1])).toDF().sort([col(’_1’),col(’_2’),col(’_3’),col(’_4’)],ascending=[0,1,1,0])

aggResult.show()


using combineByKey:-

combiner=lambda x: (set(x[0]),x[1])
mergeValue=lambda x,y: (x[0]|set(y[0]),x[1]+y[1])
mergeCombiner=lambda x,y: (x[0]|y[0],x[1]+y[1])

comb=rddkey.combineByKey(combiner,mergeValue,mergeCombiner)

combResult=comb.map(lambda x: (x[0][0],x[0][1],len(x[1][0]),x[1][1])).toDF().sort([col(’_1’),col(’_2’),col(’_3’),col(’_4’)],ascending=[0,1,1,0])

combResult.show()

0 Likes