Problem 2 - Aggregations using Spark


#1

Originally published at: http://www.itversity.com/lessons/problem-2-aggregations-using-spark/

Aggregations can be done either by using Core Spark API or Data Frame Operations or Spark SQL Problem Statement Using sqoop copy data available in mysql products table to folder /user/cloudera/products on hdfs as text file. columns should be delimited by pipe ‘|’ move all the files from /user/cloudera/products folder to /user/cloudera/problem2/products folder Change permissions…


#2

Part of the solution (Step 4c) in pyspark:

products = sc.textFile("/user/snehaananthan/problem2/products")
for i in products.take(10): print(i)

productsFiltered = products.filter(lambda p: float(p.split("|")[4]) < 100.00)

productsMap = productsFiltered. \
map(lambda p: (int(p.split("|")[1]), float(p.split("|")[4])))
for i in productsMap.take(100): print(i)

results = productsMap. \
aggregateByKey((0.0, 0, 0.0, 101.00), 
	lambda (highest, count, sum, lowest), input_price: 
		(input_price if input_price > highest else highest, 
		count + 1, 
		sum + input_price, 
		input_price if input_price < lowest else lowest), 
	lambda (highest1, count1, sum1, lowest1), (highest2, count2, sum2, lowest2): 
		(highest1 if highest1 > highest2 else highest2, 
		count1 + count2, 
		sum1 + sum2 / count1 + count2, 
		lowest1 if lowest1 < lowest2 else lowest2
		)
)
for i in results.take(10): print(i)