Solution for the below Spark Problems


#1

Does anyone had a solution for the below problem

get total no: of orders for each customer where customer_state = “TX”


Learn Spark 1.6.x or Spark 2.x on our state of the art big data labs

  • Click here for access to state of the art 13 node Hadoop and Spark Cluster


#2

@tejasvini

In Spark-SQL:

val ordersRDD = sc.textFile("/public/retail_db/orders/part-00000").map(.split(","))
val customersRDD = sc.textFile("/public/retail_db/customers/part-00000").map(
.split(","))
val ordersDF = ordersRDD.map(order => (order(0).toInt, order(2).toInt)).toDF(“order_id”, “order_customer_id”)
ordersDF.registerTempTable(“order_temp”)
val customersDF = customersRDD.map(customer => (customer(0).toInt, customer(7))).toDF(“customer_id”, “customer_state”)
customersDF.registerTempTable(“customer_temp”)
val joinedDF = sqlContext.sql(“SELECT order_id, order_customer_id, customer_state FROM order_temp JOIN customer_temp ON order_customer_id=customer_id”)
joinedDF.registerTempTable(“joined_temp”)
val reducedDF = sqlContext.sql(“SELECT order_customer_id, count(order_id) as orders_count from joined_temp where customer_state=‘TX’ group by order_customer_id order by orders_count desc”)

In Spark-Core

val ordersRDD = sc.textFile("/public/retail_db/orders/part-00000").map(.split(","))
val filteredOrdersRDD = ordersRDD.map(order => (order(2).toInt, order(0).toInt))
val customersRDD = sc.textFile("/public/retail_db/customers/part-00000").map(
.split(","))
val filteredCustomersRDD = customersRDD.map(customer => (customer(0).toInt, customer(7))).filter(customer => (customer.2 == “TX”))
val joinedRDD = filteredOrdersRDD.join(filteredCustomersRDD)
val finalRDD = joinedRDD.map(order => (order.1, 1)).reduceByKey(+
).sortBy(_._2, false)
finalRDD.take(10).foreach(println)


#3

val orders = sc.textFile("/public/retail_db/orders")
val ordersDF = orders.map(o => (o.split(",")(0).toInt, o.split(",")(2).toInt)).
toDF(“order_id”,“order_customer_id”)
ordersDF.registerTempTable(“orders”)

val customers = sc.textFile("/public/retail_db/customers")
val customersDF = customers.map(c => (c.split(",")(0).toInt, c.split(",")(1), c.split(",")(2), c.split(",")(7))).toDF(“customer_id”,“cust_fname”, “cust_lname”, “cust_state”)
customersDF.registerTempTable(“customers”)

sqlContext.sql(“show tables”).show

val totalorders = sqlContext.sql(“select customer_id, cust_fname, cust_lname, count(order_id) order_count from customers join orders on customer_id = order_customer_id where cust_state = ‘TX’ group by customer_id, cust_fname, cust_lname order by order_count desc”)