Exercise 02 - Get details of inactive customers

pyspark
scala
apache-spark
spark-shell
python
#22

Just wondering here…orders = sc.textFile(“Location”) will read the file from hadoop directory.
the problem statment is to read from local directory. I guess we have to pythin open command to read here which will give the list and then convert into RDD.

orders=open("/data/retail_db/orders/part-00000").readlines()
ordersRDD = sc.parallelize(orders)

0 Likes

#23

Yea. There are total of 1736 Smith, Mary in the customer database with different customer ids.

0 Likes

#24

get the file in hdfs
hadoop fs -rm -R /user/user_id/test_retail_db
hadoop fs -copyFromLocal /data/retail_db /user/user_id/test_retail_db

read hdfs file
orders = sc.textFile("/user/user_id/test_retail_db/orders")
customers=sc.textFile("/user/user_id/test_retail_db/customers")

covert to dataframe
from pyspark.sql import Row
ordersDf=orders.map(lambda i:Row(order_customer_id=i.split(",")[2])).toDF()
customerDf=customers.map(lambda i:Row(customer_id=i.split(",")[0],customer_fname=i.split(",")[1],customer_lname=i.split(",")[2])).toDF()

register as temp table
ordersDf.registerTempTable(“orders”)
customerDf.registerTempTable(“customers”)

use sql query to join

customerData=sqlContext.sql(“select customer_lname,customer_fname,order_customer_id from customers left outer join orders on customer_id=order_customer_id where order_customer_id is NULL order by customer_lname,customer_fname”)

Save dataframe into RDD
customerDataRDD=customerData.rdd.map(lambda i: i[0] + ', '+i[1]).coalesce(1)

save the file to hdfs
customerDataRDD.saveAsTextFile("/user/user_id/solutions/solutions02/inactive_customers")


Core API
get the file in hdfs
hadoop fs -rm -R /user/user_id/test_retail_db
hadoop fs -copyFromLocal /data/retail_db /user/user_id/test_retail_db

read hdfs file
orders = sc.textFile("/user/user_id/test_retail_db/orders")
customers=sc.textFile("/user/user_id/test_retail_db/customers")

map
ordersMap=orders.map(lambda i:(int(i.split(",")[2]),i.split(",")[2]))
customersMap=customers.map(lambda i:(int(i.split(",")[0]),(i.split(",")[1],i.split(",")[2])))

left outer join
customerList=customersMap.leftOuterJoin(ordersMap)

filter for NULL fields
customerInactiveList=customerList.filter(lambda i: i[1][1]==None)

for i in customerInactiveList.take(10):print i

sort by last name and first name
sortList=customerInactiveList.map(lambda i: ((i[1][0][1],i[1][0][0]),(i)))
sortInactiveList=sortList.sortByKey()

for i in sortInactiveList.take(100):print i

save as per requirement
finalInactiveList=sortInactiveList.map(lambda i: i[1][1][0][1] + ', '+i[1][1][0][0]).coalesce(1)

for i in finalInactiveList.take(100):print i

save to location
finalInactiveList.saveAsTextFile("/user/user_id/solutions1/solutions02/inactive_customers")

0 Likes

#25

SPARK SCALA CORE API

var orders = sc.textFile("")
var customers = sc.textFile("")

// set of customer Ids from orders table
var set_orders = orders.map(row => row.split(",")(2)).distinct().collect().toSet
// pair RDD of (customerId , (l_name, f_name)) from customers table
var rdd_customers = customers.map(row => (row.split(",")(0), (row.split(",")(2), row.split(",")(1))) )

var result = rdd_customers.filter(row => !set_orders.contains(row._1)).map(row => row._2)

var result_f = result.map(row => row._1 + “,” + row._2)

result_f.coalesce(1).saveAsTextFile("")

0 Likes

#26

========================================================================
RDD:

order_read=open("/home/cloudera/data-master/retail_db/orders/part-00000").read().splitlines()
customer_read=open("/home/cloudera/data-master/retail_db/customers/part-00000").read().splitlines()
order_rdd=sc.parallelize(order_read)
customer_rdd=sc.parallelize(customer_read)
orderMap= order_rdd.map(lambda x:(int(x.split(",")[2]),x.split(",")[3]))
customerMap = customer_rdd.map(lambda y:(int(y.split(",")[0]),(y.split(",")[2],y.split(",")[1])))
customerOrderJoin=customerMap.leftOuterJoin(orderMap)
customerFilterWithNoOrder= customerOrderJoin.filter(lambda x:x[1][1]==None)
#sort by customer_lname and then customer_fname
beforeSort=customerFilterWithNoOrder.map(lambda x:x[1][0])
sortResult=beforeSort.sortByKey().map(lambda x:str(x[0])+’,’+str(x[1]))
sortResult.coalesce(1).saveAsTextFile("/user/cloudera/solutions/solutions02/inactive_customers")

================================================================================================

Dataframes:

from pyspark.sql import Row
order_read=open("/home/cloudera/data-master/retail_db/orders/part-00000").read().splitlines()
customer_read=open("/home/cloudera/data-master/retail_db/customers/part-00000").read().splitlines()
order_rdd=sc.parallelize(order_read)
customer_rdd=sc.parallelize(customer_read)
orderDFMap= order_rdd.map(lambda x:Row(order_customer_id=int(x.split(",")[2]),order_id=int(x.split(",")[0]))).toDF()
customerDFMap = customer_rdd.map(lambda y:Row(customer_id=int(y.split(",")[0]),customer_lname=y.split(",")[2],customer_fname=y.split(",")[1])).toDF()
orderDFMap.registerTempTable(“orders”)
customerDFMap.registerTempTable(“customers”)
order_customer_join=sqlContext.sql(“select customer_lname,customer_fname from customers left join orders on customer_id=order_customer_id where order_id is null order by customer_lname, customer_fname”)
order_final_rdd=order_customer_join.rdd.map(lambda x:str(x[0])+’,’+str(x[1])).coalesce(1)
order_final_rdd.saveAsTextFile("/user/cloudera/solutions/solutions02/inactive_customers")

0 Likes

#27

(note: in this case invoke pyspark from local to be able to access the local files for the initial RDD creation)

from pyspark.sql import Row
sqlContext.setConf(“spark.sql.shuffle.partitions”, “1”)
ordersDF=sc.textFile(‘file:////data/retail_db/orders’).map(lambda rec: rec.split(",")[2]).map(lambda rec: Row(customer_id=rec[0])).toDF()
customersDF=sc.textFile(‘file:////data/retail_db/customers’).map(lambda rec: (rec.split(",")[2],rec.split(",")[1],rec.split(",")[0] )).map(lambda rec: Row(customer_lname=rec[0],customer_fname=rec[1],customer_id=rec[2])).toDF()
ordersDF.registerTempTable(‘ordersTT’)
customersDF.registerTempTable(‘customersTT’)
nonOrderingCustDF=sqlContext.sql(‘select a.customer_lname,a.customer_fname from customersTT a left outer join ordersTT b on a.customer_id=b.customer_id where b.customer_id is null order by a.customer_lname, a.customer_fname’)
nonOrderingCustDF.rdd.map(lambda rec: rec[0] +","+ rec[1])).saveAsTextFile(’/tmp/p2’)

0 Likes