aggregateByKey syntax

#1

orderItems.join(orders).map(x =>((x._2._2.split(",")(1),x._1),x.2.1.split(",")(4).toFloat)).reduceByKey(+).map(x => (x._1._1,x._2)).aggregateByKey((0,0.0))((accu,value) => (accu._1 + 1 , accu._2 + value),(value1,value2) => (value1._1 + value2._1,value1._2 + value2._2))

In the above code if we provide (0,0.0) in place of ((0,0.0)) in aggregateByKey method its gives below error:
:25: error: overloaded method value aggregateByKey with alternatives:
[U](zeroValue: U, numPartitions: Int)(seqOp: (U, Float) => U, combOp: (U, U) => U)(implicit evidence$2: scala.reflect.ClassTag[U])org.apache.spark.rdd.RDD[(String, U)]
[U](zeroValue: U, partitioner: org.apache.spark.Partitioner)(seqOp: (U, Float) => U, combOp: (U, U) => U)(implicit evidence$1: scala.reflect.ClassTag[U])org.apache.spark.rdd.RDD[(String, U)]
cannot be applied to (Int, Double)

Anyone have any idea why we need to give two brackets (()).

0 Likes

#2

Because out type is a tuple. So it should be initialized as tuple.

0 Likes

#3

if we write it like (0,0), dosent it represent a tuple

0 Likes

#4

No. The syntax for scala is : aggregateByKey()(). Inside first bracket we need to initialize a tuple (). This is as per my understanding. Correct me if I am wrong.

2 Likes

#5

@praveen - you are correct

0 Likes