Exercise 02 - Get details of inactive customers

pyspark
spark-shell
apache-spark
scala
python

#22

Just wondering here…orders = sc.textFile(“Location”) will read the file from hadoop directory.
the problem statment is to read from local directory. I guess we have to pythin open command to read here which will give the list and then convert into RDD.

orders=open("/data/retail_db/orders/part-00000").readlines()
ordersRDD = sc.parallelize(orders)


#23

Yea. There are total of 1736 Smith, Mary in the customer database with different customer ids.


#24

get the file in hdfs
hadoop fs -rm -R /user/user_id/test_retail_db
hadoop fs -copyFromLocal /data/retail_db /user/user_id/test_retail_db

read hdfs file
orders = sc.textFile("/user/user_id/test_retail_db/orders")
customers=sc.textFile("/user/user_id/test_retail_db/customers")

covert to dataframe
from pyspark.sql import Row
ordersDf=orders.map(lambda i:Row(order_customer_id=i.split(",")[2])).toDF()
customerDf=customers.map(lambda i:Row(customer_id=i.split(",")[0],customer_fname=i.split(",")[1],customer_lname=i.split(",")[2])).toDF()

register as temp table
ordersDf.registerTempTable(“orders”)
customerDf.registerTempTable(“customers”)

use sql query to join

customerData=sqlContext.sql(“select customer_lname,customer_fname,order_customer_id from customers left outer join orders on customer_id=order_customer_id where order_customer_id is NULL order by customer_lname,customer_fname”)

Save dataframe into RDD
customerDataRDD=customerData.rdd.map(lambda i: i[0] + ', '+i[1]).coalesce(1)

save the file to hdfs
customerDataRDD.saveAsTextFile("/user/user_id/solutions/solutions02/inactive_customers")


Core API
get the file in hdfs
hadoop fs -rm -R /user/user_id/test_retail_db
hadoop fs -copyFromLocal /data/retail_db /user/user_id/test_retail_db

read hdfs file
orders = sc.textFile("/user/user_id/test_retail_db/orders")
customers=sc.textFile("/user/user_id/test_retail_db/customers")

map
ordersMap=orders.map(lambda i:(int(i.split(",")[2]),i.split(",")[2]))
customersMap=customers.map(lambda i:(int(i.split(",")[0]),(i.split(",")[1],i.split(",")[2])))

left outer join
customerList=customersMap.leftOuterJoin(ordersMap)

filter for NULL fields
customerInactiveList=customerList.filter(lambda i: i[1][1]==None)

for i in customerInactiveList.take(10):print i

sort by last name and first name
sortList=customerInactiveList.map(lambda i: ((i[1][0][1],i[1][0][0]),(i)))
sortInactiveList=sortList.sortByKey()

for i in sortInactiveList.take(100):print i

save as per requirement
finalInactiveList=sortInactiveList.map(lambda i: i[1][1][0][1] + ', '+i[1][1][0][0]).coalesce(1)

for i in finalInactiveList.take(100):print i

save to location
finalInactiveList.saveAsTextFile("/user/user_id/solutions1/solutions02/inactive_customers")


#25

SPARK SCALA CORE API

var orders = sc.textFile("")
var customers = sc.textFile("")

// set of customer Ids from orders table
var set_orders = orders.map(row => row.split(",")(2)).distinct().collect().toSet
// pair RDD of (customerId , (l_name, f_name)) from customers table
var rdd_customers = customers.map(row => (row.split(",")(0), (row.split(",")(2), row.split(",")(1))) )

var result = rdd_customers.filter(row => !set_orders.contains(row._1)).map(row => row._2)

var result_f = result.map(row => row._1 + “,” + row._2)

result_f.coalesce(1).saveAsTextFile("")