ReduceByKey() usage - confused with the results

Hi All,

I was playing around with reduceBykey() functions while diving deep into scan, fold, reduce functions of scala.

I understand how reduceBykey() works in the below stand case.

General Case

    scala> val rdd = sc.parallelize(((1 to 10).map(x=> ("key",x))))
    rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[15] at parallelize at <console>:27

scala> rdd.reduceByKey(_+_)
res21: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[16] at reduceByKey at <console>:30

scala> res21.collect
res22: Array[(String, Int)] = Array((key,55))

But, I am curious to understand how the below cases work

Case 1:

scala> val rdd = sc.parallelize(((1 to 10).map(x=> ("key",x))))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[15] at parallelize at <console>:27

scala> rdd.reduceByKey(_-_)
res23: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[17] at reduceByKey at <console>:30

scala> res23.collect
res24: Array[(String, Int)] = Array((key,15))

Case 2:

scala> val rdd = sc.parallelize(((1 to 10).map(x=> ("key",x))),2)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[18] at parallelize at <console>:27

scala> rdd.reduceByKey(_-_)
res25: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[19] at reduceByKey at <console>:30

scala> res25.collect
res26: Array[(String, Int)] = Array((key,15))

Case 3:

scala> val rdd = sc.parallelize(((1 to 10).map(x=> ("key",x))),1)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[20] at parallelize at <console>:27

scala> rdd.reduceByKey(_-_)
res27: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[21] at reduceByKey at <console>:30

scala> res27.collect
res28: Array[(String, Int)] = Array((key,-53))

Case 4:

scala> val rdd = sc.parallelize(((1 to 10).map(x=> ("key",x))),10)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[22] at parallelize at <console>:27

scala> rdd.reduceByKey(_-_)
res29: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[23] at reduceByKey at <console>:30

scala> res29.collect
res30: Array[(String, Int)] = Array((key,-51))

In, the above cases- same function gives me different results based on the change in the number of partitions.
So, if I understand correctly, reducedByKey works cummulative, sequential way from left to right.

How does that function works internally?
Is there a way for us to identify what is present in each partition to determine the results?

I appreciate your help & response.

Thanks,
Ashok

reduceByKey shuffles only the results of sub-aggregations in each partition of the data.
Subtraction is neither associative nor commutative. so do not expect same results with different number of partitions.

case 3:

one partition - 1,2,3,4,5,6,7,8,9,10
1-2-3-4-5-6-7-8-9-10= -53

case 2:
2 partitions - (1,2,3,4,5), (6,7,8,9,10)
1-2-3-4-5 = -13
6-7-8-9-10 = -28

(-13) - (-28) = 15

case 4:
10 partitions - 1,2,3,4,5,6,7,8,9,10

1-2-3-4-5-6-7-8-9-10= -53

I am not sure why you got -51 for case 4. When I ran the same code I got -53.

scala> val rdd = sc.parallelize(((1 to 10).map(x=> (“key”,x))),10)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[2] at parallelize at :27

scala> rdd.reduceByKey(-)
res3: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[3] at reduceByKey at :30

scala> res3.collect
res4: Array[(String, Int)] = Array((key,-53))

Hi Pranay,

Thanks for your time and reply.
I understood how reduceByKey works over shuffling. But I still do not understand how does spark do partitioning?
I am still getting the same result with 10 partitions.

In case 2: You have split the data equally as we have 10 numbers and 2 partitions. What happens if we have 11 numbers? How does partitioning work? What numbers goes into each partition?

Thanks,
Ashok

  1. To understand how spark does partitioning take a look at this - https://acadgild.com/blog/partitioning-in-spark/.

  2. [quote=“ashok_singamaneni, post:3, topic:5127”]
    I am still getting the same result with 10 partitions.
    [/quote]

The result depends on how values are shuffled which depends on spark.shuffle.manager. From spark 1.2 by default it is sort. So I am not sure why you get -51.

3)[quote=“ashok_singamaneni, post:3, topic:5127”]
In case 2: You have split the data equally as we have 10 numbers and 2 partitions. What happens if we have 11 numbers? How does partitioning work? What numbers goes into each partition?
[/quote]

5 items will go to one partition and 6 will go to another.

sc.parallelize(((1 to 11).map(x=> (“key”,x))),2).foreachPartition(r=> {println("||"); r.toList.foreach(println)})

@Pranaysahith : Thanks for your input. It’s an interesting article.