CombineByKey giving Incorrect Results

apache-spark
pyspark
#1

In context of Certification -> Spark-Python -> Aggregation ByKey operations

I was trying to calculate RevenuePerDay step by step as discussed in Videos.

As explained that when the Input & Output type are same & if its additive All 3 Reduce,Aggregate & Combine Bykey will give the same output

but as you can see ReduceByKey & AggregateByKey are giving same output but CombineByKey is giving different.

The summation value after using combineByKey is less - Can’t understand why …???

ordersJoinOrderItemsMap = ordersJoinOrderItems.map(lambda rec : ((rec[1][1].split(",")[1], rec[0]) , float(rec[1][0].split(",")[4])) )

revenuePerDayPerOrder  = ordersJoinOrderItemsMap.reduceByKey(lambda x,y : x+y)

revenuePerDayPerOrderMap = revenuePerDayPerOrder.map(lambda x : (x[0][0], x[1]))

revenuePerDay = revenuePerDayPerOrderMap.reduceByKey(lambda x,y : x + y)

for i in revenuePerDay.sortByKey().take(10):	print i

# Output :
------------------------------------------------
(u'2013-07-25 00:00:00.0', 68153.830000000002)
(u'2013-07-26 00:00:00.0', 136520.17000000001)
(u'2013-07-27 00:00:00.0', 101074.34)
(u'2013-07-28 00:00:00.0', 87123.079999999987)
(u'2013-07-29 00:00:00.0', 137287.09000000003)
(u'2013-07-30 00:00:00.0', 102745.62000000001)
(u'2013-07-31 00:00:00.0', 131878.06)
(u'2013-08-01 00:00:00.0', 129001.62000000002)
(u'2013-08-02 00:00:00.0', 109347.00000000004)
(u'2013-08-03 00:00:00.0', 95266.889999999999)
------------------------------------------------

# USing AggregateByKey
revenuePerDayAggrBy = revenuePerDayPerOrderMap.aggregateByKey( 0.0, lambda acc, value : acc+value, lambda acc1, acc2 : acc1+acc2 )
for i in revenuePerDayAggrBy.sortByKey().take(10):	print i

# Output :
------------------------------------------------
(u'2013-07-25 00:00:00.0', 68153.830000000002)
(u'2013-07-26 00:00:00.0', 136520.17000000001)
(u'2013-07-27 00:00:00.0', 101074.34)
(u'2013-07-28 00:00:00.0', 87123.079999999987)
(u'2013-07-29 00:00:00.0', 137287.09000000003)
(u'2013-07-30 00:00:00.0', 102745.62000000001)
(u'2013-07-31 00:00:00.0', 131878.06)
(u'2013-08-01 00:00:00.0', 129001.62000000002)
(u'2013-08-02 00:00:00.0', 109347.00000000004)
(u'2013-08-03 00:00:00.0', 95266.889999999999)
------------------------------------------------

# USing CombineByKey - As we can see that i have executed CombineByKey 2 times giving different values of 1st argument

revenuePerDayCombBy = revenuePerDayPerOrderMap.combineByKey(lambda val: 0.0, lambda acc, value : acc+value, lambda acc1, acc2 : acc1+acc2 )
for i in revenuePerDayCombBy.sortByKey().take(10): print i

(u’2013-07-25 00:00:00.0’, 53845.660000000011)
(u’2013-07-26 00:00:00.0’, 121746.00000000001)
(u’2013-07-27 00:00:00.0’, 88606.950000000012)
(u’2013-07-28 00:00:00.0’, 72655.710000000006)
(u’2013-07-29 00:00:00.0’, 123518.83000000003)
(u’2013-07-30 00:00:00.0’, 89146.139999999999)
(u’2013-07-31 00:00:00.0’, 116041.84)
(u’2013-08-01 00:00:00.0’, 112467.52000000002)
(u’2013-08-02 00:00:00.0’, 96849.580000000016)
(u’2013-08-03 00:00:00.0’, 79462.079999999987)

revenuePerDayCombBy = revenuePerDayPerOrderMap.combineByKey(lambda value: 1.0, lambda acc, value : acc+value, lambda acc1, acc2 : acc1+acc2 )
for i in revenuePerDayCombBy.sortByKey().take(10): print i

(u’2013-07-25 00:00:00.0’, 53869.660000000011)
(u’2013-07-26 00:00:00.0’, 121770.00000000001)
(u’2013-07-27 00:00:00.0’, 88630.950000000012)
(u’2013-07-28 00:00:00.0’, 72679.710000000006)
(u’2013-07-29 00:00:00.0’, 123542.83000000003)
(u’2013-07-30 00:00:00.0’, 89170.139999999999)
(u’2013-07-31 00:00:00.0’, 116065.84)
(u’2013-08-01 00:00:00.0’, 112491.52000000002)
(u’2013-08-02 00:00:00.0’, 96873.580000000016)
(u’2013-08-03 00:00:00.0’, 79486.079999999987)

Please help & can anyone explain the difference like what is happening over here…??

0 Likes

#2

Can you paste the complete script you have used for combineByKey?

0 Likes

#3

This should be your combineByKey logic

revenuePerDayCombBy = revenuePerDayPerOrderMap.combineByKey(lambda val: val, lambda acc, value : acc+value, lambda acc1, acc2 : acc1+acc2 )

0 Likes

#4

But this works …?? Any explanation …?

0 Likes

#5

combineByKey will give additional flexibility to change the values of a given key before applying the aggregation logic.

revenuePerDayCombBy = revenuePerDayPerOrderMap.combineByKey(
lambda val: val, //Here if you want to add some constant value to each value, you can take care of it
lambda acc, value : acc+value,
lambda acc1, acc2 : acc1+acc2 )

0 Likes