Broadcast variable in pyspark


#1

Hi, Please refer the code as explained by Durga sir, in one of his videos, while explaining how to use broadcast variable.


from pyspark import SparkConf, SparkContext
import sys

conf = SparkConf().setAppName(“Orders Join Order Items”).setMaster(sys.argv[1])
sc = SparkContext(conf=conf)

#Reading the data
inputPath = sys.argv[2]
orders = sc.textFile(inputPath + “orders”)
orderItems = sc.textFile(inputPath + “order_items”)

Join orders, order_items and customers

To join we need to convert into tuples

First work on joining orders and order_items

ordersMap = orders.map(lambda o: (int(o.split(",")[0]), (o.split(",")[1], int(o.split(",")[2]))))

orderItemsMap = orderItems.map(lambda oi: (int(oi.split(",")[1]), float(oi.split(",")[4])))
ordersJoin = ordersMap.join(orderItemsMap)

Get revenue per day per customer id

Read customers to get customer details and broadcast

customersPath = sys.argv[3]
customers = open(customersPath).read().splitlines()
customersMap = dict(map(lambda c: (int(c.split(",")[0]),(c.split(",")[1]) + " " + (c.split(",")[2])), customers))
customersBV = sc.broadcast(customersMap)

for i in ordersJoin.take(10): print(i)

revenuePerDatePerCustId = ordersJoin.
map(lambda o: ((o[1][0][0], customersBV.value[o[1][0][1]]), o[1][1])).
reduceByKey(lambda t, v: t + v)

revenuePerDatePerCustId.
map(lambda rec: rec[0][0] + “\t” + rec[0][1] + “\t” + str(rec[1])).
saveAsTextFile(sys.argv[4])

Final Output: date(tab)customer name(tab)revenue

customer name can be computed using first name and last name


My question here is if we don’t use broadcast variable, what we need to do is to implement proper map () and reduceByKey() on revenuePerDatePerCustId and then join it with customersMap by means of Customer_id key, as the join column.

Should it give the correct answer??
Because when I executed in this way, it gave me the same no of Total Count, but there is a mismatch when individual count of any particular customer/ date is considered.
Could anyone please help me in correcting me.

Thanks in advance,
Aparna


#2

I do not understand the question. You need to share the details about the mismatch you are seeing.


#3

Please check into my code below if it is wrong.

I joined the Revenue per date per customer_id output and the Customers output by means of Customer_id key.
I didn’t use broadcast variable in my code.

Created Rdds for three tables:

orders = sc.textFile("/user/aparna149/aparna/orders")
orderItems = sc.textFile("/user/aparna149/aparna/order_items")
customers = sc.textFile("/user/aparna149/aparna/customers")

Joined Order and Order_items By order_id

ordersMap = orders.map(lambda x:(int(x.split(",")[0]), (x.split(",")[1],int(x.split(",")[2]))))
orderItemsMap = orderItems.map(lambda x:(int(x.split(",")[1]), float(x.split(",")[4])))
ordersJoin=ordersMap.join(orderItemsMap)

Excluded the order_id Column

ordersJoinMap = ordersJoin.map(lambda x: x[1])

Total of Revenue Per Date Per Customer_id

RevPerDatePerCust = ordersJoinMap.reduceByKey(lambda x,y:x+y)

Mapped as Customer_Id as the Key to join

RevWithCustId = RevPerDatePerCust.map(lambda x:( x[0][1],(x[0][0],x[1])))

Customers data with Customer_id as the Key to join

CustomerMap = customers.map(lambda x:(int(x.split(",")[0]),(str(x.split(",")[1]))+" “+(str(x.split(”,")[2]))))

Joined two Datasets bymeans of Customer_id

CustomersJoin = RevWithCustId.join(CustomerMap)

Please let me know if I am wrong in my logic/code.

Thanks in advance,
Aparna