Develop Application - Get the monthly revenue for each customer (Exercise)

Description

Develop Application using Scala or Python or Spark SQL to get the monthly revenue for each customer using Databricks Platform.

Instructions

  • Make sure to create required Tables or Data Frames using below data model diagram.
  • Consider only COMPLETE, CLOSED as well as all PENDING orders. You need to analyze the data to understand all the PENDING statuses.
  • Final output should be saved in Parquet Format into folder /FileStore/tables/retail_db/customer_revenue_monthly. Also you need to save order_month in the format of YYYYMM, then customer_name and then finally revenue.
  • Data should be sorted in ascending order by month and descending order by revenue.

Here is the Data Model for the tables provided by Cloudera as part of their exercises.


Learn Spark 1.6.x or Spark 2.x on our state of the art big data labs

  • Click here for access to state of the art 13 node Hadoop and Spark Cluster

from pyspark.sql.functions import *

orders = spark.read.csv("/FileStore/tables/retail_db/orders/part_00000-6a99e",schema= “order_id int, order_date timestamp, customer_id int, order_status string”)

orderItems = spark.read.csv("/FileStore/tables/retail_db/order_items/part_00000-6a99e",
schema = “”“order_item_id int,
order_item_order_id int,
order_item_product_id int,
order_item_quantity int,
order_item_subtotal float,
order_item_product_price float “””)

customers = spark.read.csv("/FileStore/tables/retail_db/customers/part_00000-6a99e",
schema = “”“customer_id int,
customer_fname string,
customer_lname string,
customer_email string,
customer_password string,
customer_address string,
customer_city string,
customer_state string,
customer_pincode int”"")

orders only completed, closed, pending

orders_cp = orders.filter(“order_status in (‘PENDING’,‘PENDING_PAYMENT’,‘COMPLETE’,‘CLOSED’)”)

cust_orders = orders_cp.join(customers,“customer_id”).join(orderItems, orders.order_id==orderItems.order_item_order_id).select(‘customer_id’,concat_ws(" ",‘customer_fname’,‘customer_lname’).alias(“customer_fullname”),concat(year(‘order_date’),month(‘order_date’)).alias(‘YYYYMM’).cast(“int”),‘order_item_subtotal’)

monthly_revenue = cust_orders.groupBy(‘customer_fullname’,‘YYYYMM’).agg(sum(‘order_item_subtotal’).alias(“monthly_revenue”)).sort(‘YYYYMM’,desc(‘monthly_revenue’))

monthly_revenue.coalesce(2).write.parquet("/FileStore/tables/retail_db/output/customer_revenue_monthly")

Result

±----------------±-----±-----------------+
|customer_fullname|YYYYMM| monthly_revenue|
±----------------±-----±-----------------+
| Mary Smith| 20137| 85669.48168373108|
| William Smith| 20137| 4794.450088500977|
| Mark Smith| 20137| 4665.380086898804|
| David Smith| 20137| 4584.67008972168|
| Robert Smith| 20137| 4199.48006439209|
| John Smith| 20137|3537.6501064300537|
| Jessica Smith| 20137| 3259.740062713623|
| Tyler Smith| 20137| 3179.670066833496|
| Jose Smith| 20137|2924.5900440216064|
| Mary Reynolds| 20137|2899.7600708007812|
| Mary Hall| 20137|2789.5600204467773|
| Mary Davis| 20137|2749.4699935913086|
| Joseph Smith| 20137| 2499.630039215088|
| Sara Smith| 20137|2409.6700439453125|
|Christopher Smith| 20137| 2402.560001373291|
| Joshua Smith| 20137| 2331.730028152466|
| Elizabeth Smith| 20137|2329.6500396728516|
| Karen Smith| 20137|2279.8400497436523|
| Helen Smith| 20137|2269.6000328063965|
| Peter Smith| 20137| 2255.710060119629|
±----------------±-----±-----------------+
only showing top 20 rows