# My Request to Everyone

#1

Hello all,

My request to everyone who raised issues in this community, please respond back if your issue resolved or still exist. This way, other community members also learn from your issues and resolutions.

Thank you
Venkat

#2

Hi Venkat,
I still have one unresolved issue with me.

Thannks
Aparna

#3

Thanks
Venkat

#4

Hi Venkat,

The Problem statement is to Get Daily Revenue Per Customer Name.
Tables to be used : orders, order_items, customers
I tried two methods for this.

In my first approach, I have used Broadcast variable for customer (with customer _id and customer_name). In this approach, the final count I am getting is 47654

In my 2nd approach, I have joined customer table with the joined resultset of orders and order_items, to replace the customer_id with the customer_name, but in this case, the final count I am getting is 57047.

I think the count should be the same in both cases.
Please look into my code and let me know where I am wrong.

## Get Daily revenue per customer:(using broadcast variable)

#Joining orders and order_items -

orders = sc.textFile("/user/aparna149/aparna/orders")
orderItems = sc.textFile("/user/aparna149/aparna/order_items")
ordersMap = orders.map(lambda x:(int(x.split(",")[0]),(x.split(",")[1],int(x.split(",")[2]))))
orderItemsMap = orderItems.map(lambda x: (int(x.split(",")[1]), float(x.split(",")[4])))
ordersJoinOrderItems = ordersMap.join(orderItemsMap)
for i in ordersJoinOrderItems.take(20): print(i)

ordersJoinOrderItemsMap = ordersJoinOrderItems.map(lambda x: x[1])
for i in ordersJoinOrderItemsMap.take(20): print(i)

customersMap = dict(map(lambda c: (int(c.split(",")[0]),(c.split(",")[1]) + " " + (c.split(",")[2])), customers))

#Substituting customer_id by customer_name -

RevenueCustomerMap = ordersJoinOrderItemsMap.map(lambda x:((x[0][0], customersBV.value[x[0][1]]),x[1]))
for i in RevenueCustomerMap.take(20): print(i)

RevenuePerDatePerCustomer = RevenueCustomerMap.reduceByKey(lambda x, y: x+y)
for i in RevenuePerDatePerCustomer.take(20): print(i)

FinalRevenueOutput = RevenuePerDatePerCustomer.map(lambda x:(x[0][0]+ â€ś\tâ€ť + x[0][1] + â€ś\tâ€ť + str(x[1])))

## Get Daily revenue per customer: (using join)

orders = sc.textFile("/user/aparna149/aparna/orders")
orderItems = sc.textFile("/user/aparna149/aparna/order_items")
customers = sc.textFile("/user/aparna149/aparna/customers")
ordersMap = orders.map(lambda x:(int(x.split(",")[0]),(x.split(",")[1],int(x.split(",")[2]))))
orderItemsMap = orderItems.map(lambda x: (int(x.split(",")[1]), float(x.split(",")[4])))
ordersJoinOrderItems = ordersMap.join(orderItemsMap)
for i in ordersJoinOrderItems.take(20): print(i)

ordersJoinOrderItemsMap = ordersJoinOrderItems.map(lambda x: x[1])
for i in ordersJoinOrderItemsMap.take(20): print(i)

DailyRevenuePerCustomer = ordersJoinOrderItemsMap.reduceByKey(lambda x,y: x+y)
for i in DailyRevenuePerCustomer.take(20): print(i)

DailyRevenuePerCustomerMap = DailyRevenuePerCustomer.map(lambda x: (x[0][1],(x[0][0], x[1])))
customersMap = customers.map(lambda x:(int(x.split(",")[0]),(x.split(",")[1],x.split(",")[2])))
customersJoinDailyRevenue = customersMap.join(DailyRevenuePerCustomerMap)
for i in customersJoinDailyRevenue.take(20): print(i)

RevenuePerCustomerName = customersJoinDailyRevenue.map(lambda x: (x[1][0][0]+ " â€ś+ x[1][0][1] + â€ś\tâ€ť + x[1][1][0] +â€ť\t" +str(x[1][1][1])))

It would be a great help if I could get to know whatâ€™s the issue?
Aparna

#5

Hello Aparna

I found the problem statement to be interesting and gave an attempt using Spark SQL. Here is my solution. Can you let me know if it solves the use case of the problem

``````import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext

// Get Daily Revenue Per Customer Name

object test {

val conf = new SparkConf().setAppName("test").setMaster("local")
val sc = new SparkContext(conf)

val sqlContext=new SQLContext(sc)

import sqlContext. implicits._

def main(args: Array[String]): Unit = {
val orders = sc.textFile("/home/varunu28/data/retail_db/orders")
val order_items = sc.textFile("/home/varunu28/data/retail_db/order_items")
val customers = sc.textFile("/home/varunu28/data/retail_db/customers")

val ordersDF = orders.
map(rec => (rec.split(",")(0).toInt, rec.split(",")(1).split(" ")(0), rec.split(",")(2).toInt)).
toDF("order_id", "order_date", "order_customer_id")

val order_itemsDF = order_items.
map(rec => (rec.split(",")(1).toInt, rec.split(",")(2).toInt, rec.split(",")(4).toFloat)).
toDF("order_item_id", "order_item_order_id", "order_item_subtotal")

val customersDF = customers.
map(rec => (rec.split(",")(0).toInt, rec.split(",")(1) + " " + rec.split(",")(2))).
toDF("customer_id", "customer_name")

ordersDF.registerTempTable("orders")
order_itemsDF.registerTempTable("order_items")
customersDF.registerTempTable("customers")

val res = sqlContext.sql("SELECT order_date, customer_name, ROUND(SUM(order_item_subtotal), 2) AS revenue " +
"FROM orders JOIN order_items ON orders.order_id=order_items.order_item_order_id " +
"JOIN customers ON orders.order_customer_id=customers.customer_id " +
" GROUP BY order_date, customer_name " +
"ORDER BY order_date").count()

println(res)
}

}

``````

#6

Hi Varun,
Thank you very much for your help. Really appreciate.
Though I donâ€™t know scala, still I tried your code in pyspark and got the count 47654, which exactly matches with my 1st result.
but still I am curious to know what is wrong in my 2nd approach?
Could you Please look into the code again (2nd one) and let me know ?

Thanks ,
Aparna

#7

REF : Get Daily revenue per customer: (using join)

All your steps are correct except last step:
You SHOULD put double quotes(" ") INSTEAD of double apostrophes (â€ś â€ś) in this step.
Now check the count. It is 57047
Proof my execution is below along with sample output data.

RevenuePerCustomerName = customersJoinDailyRevenue.map(lambda x: (x[1][0][0]+ " â€ś+ x[1][0][1] + â€ś\tâ€ť + x[1][1][0] +â€ť\t" +str(x[1][1][1])))

My RDD:
RevenuePerCustomerName = customersJoinDailyRevenue.map(lambda x: (x[1][0][0]+ " â€ś+ x[1][0][1] + â€ś\tâ€ť + x[1][1][0] +â€ť\t" +str(x[1][1][1])))

###########################################################################

orders = sc.textFile("/user/vanampudi/retail_db/orders")
order_items = sc.textFile("/user/vanampudi/retail_db/order_items")
customers = sc.textFile("/user/vanampudi/retail_db/customers")

ordersMap = orders.map(lambda x:(int(x.split(",")[0]),(x.split(",")[1],int(x.split(",")[2]))))
ordersMap.first()
(1, (uâ€™2013-07-25 00:00:00.0â€™, 11599))

orderItemsMap = order_items.map(lambda x: (int(x.split(",")[1]), float(x.split(",")[4])))
orderItemsMap.first()
(1, 299.98)

ordersJoinOrderItems = ordersMap.join(orderItemsMap)
for i in ordersJoinOrderItems.take(10):print(i)

(32768, ((uâ€™2014-02-12 00:00:00.0â€™, 1900), 199.99))
(32768, ((uâ€™2014-02-12 00:00:00.0â€™, 1900), 129.99))
(32768, ((uâ€™2014-02-12 00:00:00.0â€™, 1900), 299.98))
(32768, ((uâ€™2014-02-12 00:00:00.0â€™, 1900), 399.98))
(49152, ((uâ€™2014-05-27 00:00:00.0â€™, 9778), 299.98))
(4, ((uâ€™2013-07-25 00:00:00.0â€™, 8827), 49.98))
(4, ((uâ€™2013-07-25 00:00:00.0â€™, 8827), 299.95))
(4, ((uâ€™2013-07-25 00:00:00.0â€™, 8827), 150.0))
(4, ((uâ€™2013-07-25 00:00:00.0â€™, 8827), 199.92))
(50192, ((uâ€™2014-06-04 00:00:00.0â€™, 1083), 129.99))

ordersJoinOrderItemsMap = ordersJoinOrderItems.map(lambda x: x[1])
for i in ordersJoinOrderItemsMap.take(20): print(i)
((uâ€™2014-02-12 00:00:00.0â€™, 1900), 199.99)
((uâ€™2014-02-12 00:00:00.0â€™, 1900), 129.99)
((uâ€™2014-02-12 00:00:00.0â€™, 1900), 299.98)
((uâ€™2014-02-12 00:00:00.0â€™, 1900), 399.98)
((uâ€™2014-05-27 00:00:00.0â€™, 9778), 299.98)
((uâ€™2013-07-25 00:00:00.0â€™, 8827), 49.98)
((uâ€™2013-07-25 00:00:00.0â€™, 8827), 299.95)
((uâ€™2013-07-25 00:00:00.0â€™, 8827), 150.0)
((uâ€™2013-07-25 00:00:00.0â€™, 8827), 199.92)
((uâ€™2014-06-04 00:00:00.0â€™, 1083), 129.99)

DailyRevenuePerCustomer = ordersJoinOrderItemsMap.reduceByKey(lambda x,y: x+y)
for i in DailyRevenuePerCustomer.take(10): print(i)
((uâ€™2013-11-15 00:00:00.0â€™, 915), 329.98)
((uâ€™2014-04-18 00:00:00.0â€™, 9415), 49.98)
((uâ€™2013-11-21 00:00:00.0â€™, 9970), 499.97)
((uâ€™2014-02-02 00:00:00.0â€™, 9812), 179.97)
((uâ€™2013-11-07 00:00:00.0â€™, 4486), 149.94)
((uâ€™2013-08-15 00:00:00.0â€™, 4417), 759.8500000000001)
((uâ€™2014-06-21 00:00:00.0â€™, 3949), 1029.87)
((uâ€™2013-10-02 00:00:00.0â€™, 8180), 979.86)
((uâ€™2014-07-08 00:00:00.0â€™, 6493), 549.88)
((uâ€™2014-02-12 00:00:00.0â€™, 6815), 549.94)

DailyRevenuePerCustomerMap = DailyRevenuePerCustomer.map(lambda x: (x[0][1],(x[0][0], x[1])))
for in DailyRevenuePerCustomerMap.take(10):print(i)
(915, (uâ€™2013-11-15 00:00:00.0â€™, 329.98))
(9415, (uâ€™2014-04-18 00:00:00.0â€™, 49.98))
(9970, (uâ€™2013-11-21 00:00:00.0â€™, 499.97))
(9812, (uâ€™2014-02-02 00:00:00.0â€™, 179.97))
(4486, (uâ€™2013-11-07 00:00:00.0â€™, 149.94))
(4417, (uâ€™2013-08-15 00:00:00.0â€™, 759.8500000000001))
(3949, (uâ€™2014-06-21 00:00:00.0â€™, 1029.87))
(8180, (uâ€™2013-10-02 00:00:00.0â€™, 979.86))
(6493, (uâ€™2014-07-08 00:00:00.0â€™, 549.88))
(6815, (uâ€™2014-02-12 00:00:00.0â€™, 549.94))

customersMap = customers.map(lambda x:(int(x.split(",")[0]),(x.split(",")[1],x.split(",")[2])))
for i in customersMap.take(10):print(i)
(1, (uâ€™Richardâ€™, uâ€™Hernandezâ€™))
(2, (uâ€™Maryâ€™, uâ€™Barrettâ€™))
(3, (uâ€™Annâ€™, uâ€™Smithâ€™))
(4, (uâ€™Maryâ€™, uâ€™Jonesâ€™))
(5, (uâ€™Robertâ€™, uâ€™Hudsonâ€™))
(6, (uâ€™Maryâ€™, uâ€™Smithâ€™))
(7, (uâ€™Melissaâ€™, uâ€™Wilcoxâ€™))
(8, (uâ€™Meganâ€™, uâ€™Smithâ€™))
(9, (uâ€™Maryâ€™, uâ€™Perezâ€™))
(10, (uâ€™Melissaâ€™, uâ€™Smithâ€™))

customersJoinDailyRevenue = customersMap.join(DailyRevenuePerCustomerMap)
for i in customersJoinDailyRevenue.take(10):print(i)
(8196, ((uâ€™Kimberlyâ€™, uâ€™Sheppardâ€™), (uâ€™2013-11-01 00:00:00.0â€™, 829.97)))
(8196, ((uâ€™Kimberlyâ€™, uâ€™Sheppardâ€™), (uâ€™2013-09-06 00:00:00.0â€™, 119.98)))
(8196, ((uâ€™Kimberlyâ€™, uâ€™Sheppardâ€™), (uâ€™2013-10-13 00:00:00.0â€™, 119.98)))
(8196, ((uâ€™Kimberlyâ€™, uâ€™Sheppardâ€™), (uâ€™2014-03-22 00:00:00.0â€™, 969.9200000000001)))
(8196, ((uâ€™Kimberlyâ€™, uâ€™Sheppardâ€™), (uâ€™2013-08-24 00:00:00.0â€™, 499.97)))
(8196, ((uâ€™Kimberlyâ€™, uâ€™Sheppardâ€™), (uâ€™2014-02-25 00:00:00.0â€™, 129.99)))
(8196, ((uâ€™Kimberlyâ€™, uâ€™Sheppardâ€™), (uâ€™2013-12-05 00:00:00.0â€™, 949.96)))
(6, ((uâ€™Maryâ€™, uâ€™Smithâ€™), (uâ€™2014-02-13 00:00:00.0â€™, 649.97)))
(6, ((uâ€™Maryâ€™, uâ€™Smithâ€™), (uâ€™2013-09-09 00:00:00.0â€™, 1049.84)))
(6, ((uâ€™Maryâ€™, uâ€™Smithâ€™), (uâ€™2013-09-10 00:00:00.0â€™, 629.82)))

RevenuePerCustomerName = customersJoinDailyRevenue.map(lambda x: (x[1][0][0]+ " â€ś+ x[1][0][1] + â€ś\tâ€ť + x[1][1][0] +â€ť\t" +str(x[1][1][1])))
for i in RevenuePerCustomerName.take(10):print(i)
Kimberly Sheppard 2013-11-01 00:00:00.0 829.97
Kimberly Sheppard 2013-09-06 00:00:00.0 119.98
Kimberly Sheppard 2013-10-13 00:00:00.0 119.98
Kimberly Sheppard 2014-03-22 00:00:00.0 969.92
Kimberly Sheppard 2013-08-24 00:00:00.0 499.97
Kimberly Sheppard 2014-02-25 00:00:00.0 129.99
Kimberly Sheppard 2013-12-05 00:00:00.0 949.96
Mary Smith 2014-02-13 00:00:00.0 649.97
Mary Smith 2013-09-09 00:00:00.0 1049.84
Mary Smith 2013-09-10 00:00:00.0 629.82

RevenuePerCustomerName.count() = 57047

Thanks
Venkat

#8

@Aparana Sen:

This is good issue and interesting as well. We have to review whether lookup with broadcast variable will give same result as join.

Thanks
Venkat

#9

Hi Venkat,
Thanks a lot for your time and effort. I think you got my issue, why there is a mismatch in the count.
I am sure it will be resolved soon. ď¸Ź

Thanks
Aparna

#10