Exercise 02 - Get details of inactive customers

pyspark
spark-shell
apache-spark
scala
python

#1

Before attempting these questions, make sure you prepare by going through appropriate material.


Here are the Udemy coupons for our certification courses. Our coupons include 1 month lab access as well.

  • Click here for $35 coupon for CCA 175 Spark and Hadoop Developer using Python.
  • Click here for $35 coupon for CCA 175 Spark and Hadoop Developer using Scala.
  • Click here for $25 coupon for HDPCD:Spark using Python.
  • Click here for $25 coupon for HDPCD:Spark using Scala.
  • Click here for access to state of the art 13 node Hadoop and Spark Cluster

  • Details - Duration 15 to 20 minutes
    • Data is available in local file system /data/retail_db
    • Source directories: /data/retail_db/orders and /data/retail_db/customers
    • Source delimiter: comma (“,”)
    • Source Columns - orders - order_id, order_date, order_customer_id, order_status
    • Source Columns - customers - customer_id, customer_fname, customer_lname and many more
    • Get the customers who have not placed any orders, sorted by customer_lname and then customer_fname
    • Target Columns: customer_lname, customer_fname
    • Number of files - 1
    • Target Directory: /user/<YOUR_USER_ID>/solutions/solutions02/inactive_customers
    • Target File Format: TEXT
    • Target Delimiter: comma (“, ”)
    • Compression: N/A
  • Validation
  • Solutions

#3

I did not get you. Can you elaborate? It seems you have posted in incorrect topic :slight_smile:


#4

Core API in Python

pyspark --master yarn --conf spark.ui.port=12643

orders = sc.textFile(“Location”)
customers = sc.textFile(“Location”)

ordersMap = orders.map(lambda order: (int(order.split(",")[2]),1))
customersMap = customers.map(lambda c: (int(c.split(",")[0]),(c.split(",")[2],c.split(",")[1])))

customersLeftOuterJoinOrders = customersMap.leftOuterJoin(ordersMap)

inactiveCustomers = customersLeftOuterJoinOrders.filter(lambda cust: cust[1][1] == None)
for i in inactiveCustomers.take(10)

nameOfCustomers = inactiveCustomers.map(lambda name: name[1][0][0] + “,” + name[1][0][1])

nameOfCustomers.saveAsTextFile(“Location”)

Data Frames and SQL in Python
pyspark --master yarn --conf spark.ui.port=12643

orders = sc.textFile(“Location”)
customers = sc.textFile(“Location”)

from pyspark.sql import Row

ordersDF = orders.map(lambda orders: Row(orders_customer_id = int(orders.split(",")[2]))).toDF()
customersDF = customers.map(lambda customers: Row(customer_id = int(customers.split(",")[0]), lname =(customers.split(",")[2]), fname = (customers.split(",")[1]))).toDF()

ordersDF.registerTempTable(“orders_df”)
customersDF.registerTempTable(“customers_df”)

sqlContext.setConf(“spark.sql.shuffle.partitions”, “1”)

ordersJoinCustomers = sqlContext.sql(“select lname,fname from customers_df left outer join orders_df on customer_id = orders_customer_id where orders_customer_id is null order by lname,fname”)

inactiveCustomerNames = ordersJoinCustomers.rdd.map(lambda row: row[0] + ", " + row[1])

inactiveCustomerNames.saveAsTextFile(“Location”)


#5

import scala.io.Source

val ordersRDD = sc.parallelize(Source.fromFile("/data/retail_db/orders/part-00000").getLines.toList())

val custromsersRDD = sc.parallelize(Source.fromFile("/data/retail_db/customers/part-00000").getLines.toList())

val ordersDF = ordersRDD.
map( order => order.split(",")(2).toInt ).
toDF(“order_customer_id”)

val customersDF = customersRDD.
map( cust => {
val data = cust.split(",")
( data(0).toInt, data(2).trim, data(1).trim )
}).toDF(“customer_id”, “customer_lname”, “customer_fname”)

val inactCustDF = customersDF.join(ordersDF, customersDF(“customer_id”) === ordersDF(“order_customer_id”), “left_outer”).
filter(“order_customer_id is null”).
orderBy($“customer_lname”, $“customer_fname”)

val inactiveCustomersDF = inactCustDF.select($“customer_lname”, $“customer_fname”)

inactiveCustomersDF.rdd.map( cust => cust.mkstring(",")).coalesce(1).
saveAsTextFile(“output_dir”)


#6

Spark SQL and Dataframe (in Scala):

val oData = sc.textFile(“file:///home/cloudera/retail_db/orders”).map(r => { var d = r.split(’,’); (d(0).toInt,d(2).toInt) } )

val cData = sc.textFile(“file:///home/cloudera/retail_db/customers”).map(r => { var d = r.split(’,’); (d(0).toInt,d(1).toString,d(2).toString )} )

case class ordercc (
order_id: Int ,
order_customer_id: Int
)

case class cuscc (
customer_id: Int,
customer_fname: String,
customer_lname: String
)

val orderDF = oData.map( r => ordercc(r._1, r._2) ).toDF
val cusDF = cData.map( r => cuscc(r._1, r._2, r._3) ).toDF

Spark SQL

orderDF.registerTempTable(“ordert”)
cusDF.registerTempTable(“cust”)

val resultSQL = sqlContext.sql("select c.customer_lname, c.customer_fname from ordert o right outer join cust c on o.order_customer_id = c.customer_id where o.order_id is null order by c.customer_lname, c.customer_fname ")

resultSQL.map(r => r(0)+","+r(1)).repartition(1).saveAsTextFile("/user/cloudera/solutions/solutions02/inactive_customers")

Data Frame

val orderJoin = orderDF.join(cusDF, orderDF(“order_customer_id”) === cusDF(“customer_id”), “right_outer” )

orderJoin.filter(“order_id is null”).select(col(“customer_lname”),col(“customer_fname”) ).orderBy(col(“customer_lname”), col(“customer_fname”)).map(r => r(0)+","+r(1)).repartition(1).saveAsTextFile("/user/cloudera/solutions/solutions02/inactive_customers_DF")


#7

Hi,
Please check my code in pyspark and let me know if it needs any kind of correction / improvement.
However I couldn’t sort on customer_fname.
Please tell me how it can be achieved.

ordersRdd = sc.textFile("/data/retail_db/orders")
ordersMap = ordersRdd.map(lambda x: (int(x.split(",")[2]), int(x.split(",")[0])))

customersRdd = sc.textFile("/data/retail_db/customers")
customersMap = customersRdd.map(lambda x: (int(x.split(",")[0]), (x.split(",")[2],x.split(",")[1])))

customersLeftJoinOrders = customersMap.leftOuterJoin(ordersMap)

for i in customersLeftJoinOrders.take(20): print(i)
customersLeftJoinOrdersNoOrders = customersLeftJoinOrders.filter(lambda x: x[1][1] == None)

customerNames = customersLeftJoinOrdersNoOrders.map(lambda x: x[1][0])

customerNamesSorted = customerNames.sortByKey().map(lambda x: str(x[0])+","+str(x[1]))
for i in customerNamesSorted.collect(): print(i)

customerNamesSorted.saveAsTextFile("/user/aparna149/solutions/solutions02/inactive_customers")

Thanks in advance,
Aparna Sen


#8

Can someone confirm if the output result set count is 30,I see Smith,Mary multiple times hence asking?


#9

Hi,
I too got count - 30 in my final result.


#10

ls /data/retail_db/orders
part-00000

Since the file is in local file system, I tried the following the the ITVersity lab using pyspark
ordersRaw = sc.textFile(“file:///data/retail_db/orders”)

It throws
18/03/29 11:02:03 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 4, wn0
1.itversity.com): java.io.FileNotFoundException: File file:/data/retail_db/or
ders/part-00000 does not exist

What am I missing?


#11

Usually, in order to load a file using the code you have pasted, we have to make sure that the file is present locally on all the nodes in the cluster.

We can use this maybe on a standalone node.

Hence, this is not a recommended way to load a text file from local. Instead, you can copy the file/(s) on to HDFS (say, "/users/<your_username>/data/retail_db/orders") and then load them the usual way:

ordersRaw = sc.textFile(“/users/<your_username>/data/retail_db/orders”)


#12

Yes, I uploaded file to HDFS when file:/// failed. Thanks.
Let’s hope they always ask us to load the file from HDFS in the exam!


#13

Hi.
Can any one post the Pyspark code for this. I am able to sort only customer_lname. I am unable to sort the customer_fname.

Using SparkSQL I got the desired output. But I want it in PYSPARK. Below is my code. Thanks

orders=open("/data/retail_db/orders/part-00000").read().splitlines()
ordersRDD=sc.parallelize(orders)
customers=open("/data/retail_db/customers/part-00000").read().splitlines()
customersRDD=sc.parallelize(customers)
ordersRDDMap=ordersRDD.map(lambda o: (int(o.split(",")[2]),1))
customersRDDMap = customersRDD.map(lambda c: (int(c.split(",")[0]),(c.split(",")[2],c.split(",")[1])))
customersJoinOrders = customersRDDMap.leftOuterJoin(ordersRDDMap)
customersWOOrders=customersJoinOrders.filter(lambda i: i[1][1] == None)
customersWOOrdersMap = customersWOOrders.map(lambda x: (x[1][0][0],x[1][0][1]))
customersWOOrdersMapSorted = customersWOOrdersMap.sortByKey().map(lambda x: x[0]+","+x[1])
for i in customersWOOrdersMapSorted.take(50): print i

Thanks


#14

orders=open("/data/retail_db/orders/part-00000").read().splitlines()
ordersRDD=sc.parallelize(orders)
customers=open("/data/retail_db/customers/part-00000").read().splitlines()
customersRDD=sc.parallelize(customers)
ordersMap=ordersRDD.map(lambda x: (int(x.split(",")[2]), int(x.split(",")[0])))
customersMap=customersRDD.map(lambda x : (int(x.split(",")[0]),(x.split(",")[2],x.split(",")[1])))
orderCustomerJoin = customersMap.leftOuterJoin(ordersMap)
customersWithNoOrder = orderCustomerJoin.filter(lambda x : x[1][1] == None).map(lambda x : (x[1][0], x[1][0][0]+", "+x[1][0][1])).sortByKey().map(lambda x : x[1])
customersWithNoOrder.saveAsTextFile()


#15

#python
2:
#pyspark method:
#get data from lfs
du -sh /data/retail_db

du -sh /data/retail_db/orders

du -sh /data/retail_db/customers

#launch pyspark

pyspark --master yarn
–conf spark.ui.port=54213
–num-executors 2
–executor-cores 2
–executor-memory 512M

#load the data from lfs
ordersRaw = open("/data/retail_db/orders/part-00000").read().splitlines()
type(ordersRaw)
ordersRDD = sc.parallelize(ordersRaw)
for i in ordersRDD.take(10): print(i)


#not needed
ordersRDDStatusDistinct = ordersRDD.
map(lambda x: x.split(",")[3]).distinct()
for i in ordersRDDStatusDistinct.collect(): print(i)

customersRaw = open("/data/retail_db/customers/part-00000")
customersRDD = sc.parallelize(customersRaw)
for i in customersRDD.take(10): print(i)

ordersRDDKeyVal = ordersRDD.
map(lambda x: (int(x.split(",")[2]), 1))
for i in ordersRDDKeyVal.take(10): print(i)

customersRDDKeyVal = customersRDD.
map(lambda x: ((int(x.split(",")[0]), (x.split(",")[2], x.split(",")[1]))))
for i in customersRDDKeyVal.take(10): print(i)

ordersJoinCustomers = customersRDDKeyVal.leftOuterJoin(ordersRDDKeyVal)
for i in ordersJoinCustomers.take(100): print(i)

#None without “”
inactiveCustomersSorted = ordersJoinCustomers.filter(lambda x: x[1][1]== None).
map(lambda x: ((x[1][0][0], x[1][0][1]), x[1][1])).sortByKey()
for i in inactiveCustomersSorted.take(10): print(i)
(or)
inactiveCustomersSorted = ordersJoinCustomers.filter(lambda x: x[1][1]== None).
map(lambda x: x[1]).sortByKey()
for i in inactiveCustomersSorted.take(10): print(i)

#saveAsTextFile
inactiveCustomersSortedSaved = inactiveCustomersSorted.map(lambda x: x[0][0] + “, " + x[0][1])
inactiveCustomersSortedSaved.coalesce(1).saveAsTextFile(”/user/rjshekar90/solutions/solutions02/inactive_customers")

#run in pyspark
for i in sc.textFile("/user/rjshekar90/solutions/solutions02/inactive_customers/part-00000").take(10): print(i)

hdfs dfs -ls /user/rjshekar90/solutions/solutions02/inactive_customers
#remove existing file
hdfs dfs -rm -R /user/rjshekar90/solutions


#16

You should use sortBy
customerNamesSorted = customerNames.sortBy(lambda x: x[0],x[1])


#17

ordersRaw = open().read().splitlines()
customersRaw = open("/data/retail_db/customers/part-00000").read().splitlines()
ordersRDD = sc.parallelize(ordersRaw)
customersRDD = sc.parallelize(customersRaw)
ordersRddMap = ordersRDD.map(lambda x : (int(x.split(",")[2]),1))
customersRddMap = customersRDD.map(lambda x : ((int(x.split(",")[0])),(x.split(",")[2],x.split(",")[1])))
customersJoinOrders = customersRddMap.leftOuterJoin(ordersRddMap)

customersJoinOrdersFilter = customersJoinOrders.filter(lambda x : x[1][1] == None)
customerNames = customersJoinOrdersFilter.map(lambda x: (x[1][0],1))
customerNamesSorted = customerNames.sortByKey().map(lambda x: str(x[0][0])+","+str(x[0][1]))

customersJoinOrdersFilterMap.saveAsTextFile(’’)


#18

ordersRaw = open("/data/retail_db/orders/part-00000").read().splitlines()
ordersRD = sc.parallelize(ordersRaw)

from pyspark.sql import Row

ordersDF = ordersRD.map(lambda o: Row(order_customer_id = int(o.split(",")[2]))).toDF()

ordersDF.registerTempTable(“orders”)

sqlContext.sql(“select count(distinct order_customer_id) as Count from orders”).collect()

customersRaw = open("/data/retail_db/customers/part-00000").read().splitlines()
customersRD = sc.parallelize(customersRaw)
customersRD = customersRD.
map(lambda c: Row(customer_id = int(c.split(",")[0]),customer_lname= c.split(",")[2],customer_fname =c.split(",")[1])).
toDF()

customersRD.registerTempTable(“customers”)

sqlContext.sql(“select customer_id, customer_lname, customer_fname from customers”).show()

resultDF = sqlContext.sql(“select customer_lname, customer_fname
from customers c left outer join orders o
on c.customer_id = o.order_customer_id
where o.order_customer_id is null
order by customer_lname, customer_fname”)

resultDF.map(lambda r : r[0]+", “+r[1]).coalesce(1).saveAsTextFile(”/user/smakireddy/solutions/DF-2/inactive_customers")


#19

In the problem statement, it is mentioned to store as a single file. Is there a way to do it?


#20

I got it. you have to use coalesce(1)


#21

This is the only complete answer till now. :+1: