Problem 2 - Aggregations using Spark


Aggregations can be done either by using Core Spark API or Data Frame Operations or Spark SQL Problem Statement Using sqoop copy data available in mysql products table to folder /user/cloudera/products on hdfs as text file. columns should be delimited by pipe ‘|’ move all the files from /user/cloudera/products folder to /user/cloudera/problem2/products folder Change permissions…


Part of the solution (Step 4c) in pyspark:

products = sc.textFile("/user/snehaananthan/problem2/products")
for i in products.take(10): print(i)

productsFiltered = products.filter(lambda p: float(p.split("|")[4]) < 100.00)

productsMap = productsFiltered. \
map(lambda p: (int(p.split("|")[1]), float(p.split("|")[4])))
for i in productsMap.take(100): print(i)

results = productsMap. \
aggregateByKey((0.0, 0, 0.0, 101.00), 
	lambda (highest, count, sum, lowest), input_price: 
		(input_price if input_price > highest else highest, 
		count + 1, 
		sum + input_price, 
		input_price if input_price < lowest else lowest), 
	lambda (highest1, count1, sum1, lowest1), (highest2, count2, sum2, lowest2): 
		(highest1 if highest1 > highest2 else highest2, 
		count1 + count2, 
		sum1 + sum2 / count1 + count2, 
		lowest1 if lowest1 < lowest2 else lowest2
for i in results.take(10): print(i)