AggregateByKey issue


#1

Hi Team,

I am getting issue while using aggregateByKey clause . Below is the code i am running:

productsRDD=sc.textFile("/user/abhirajs25/jitendra/Practise/problem2/products/")
productsRDDFil=productsRDD.filter(lambda x : float(x.split("|")[4])<100)
productsRDDFilKey=productsRDDFil.map(lambda x: (int(x.split("|")[1]),(float(x.split("|")[4]),x.split("|")[0])))
import pyspark.sql.functions as fn
productsRDDFilKeyagg=productsRDDFilKey.aggregateByKey((0.0,0.0,0,9999999999999.0),(lambda x,y :(fn.max(x[0],y[0]),x[1]+y[0],x[2]+1,fn.min(x[3],y[0]))),(lambda x,y :(fn.max(x(0),y(0)),x(1)+y(1),x(2)+y(2),fn.min(x(3),y(3)))))

After this when i am trying to see data using below command. I am getting error:

for i in productsRDDFilKeyagg.take(10) : print i

Error: TypeError: _() takes exactly 1 argument (2 given)

Not able to find where the problem is.

Kindly help to resolve this issue.


Learn Spark 1.6.x or Spark 2.x on our state of the art big data labs

  • Click here for access to state of the art 13 node Hadoop and Spark Cluster


#2

you have to use square brackets, not parentheses.

(fn.max(x[0],y[0]),x[1]+y[1],x[2]+y[2],fn.min(x[3],y[3]))))


#3

Hi Annapurana,

After using parenthesis, I am getting a new error:

code:
productsRDD=sc.textFile("/user/abhirajs25/jitendra/Practise/problem2/products/")
productsRDDFil=productsRDD.filter(lambda x : float(x.split("|")[4])<100)
productsRDDFilKey=productsRDDFil.map(lambda x: (int(x.split("|")[1]),(float(x.split("|")[4]),x.split("|")[0])))
import pyspark.sql.functions as fn

productsRDDFilKeyagg=productsRDDFilKey.aggregateByKey((0.0,0.0,0,9999999999999.0),(lambda x,y :(fn.max(x(0),y(0)),x(1)+y(0),x(2)+1,fn.min(x(3),y(0)))),(lambda x,y :(fn.max(x(0),y(0)),x(1)+y(1),x(2)+y(2),fn.min(x(3),y(3)))))

for i in productsRDDFilKeyagg.take(10) : print i

Error:
TypeError: ‘tuple’ object is not callable

Please help