Aggregate dataset , using reduce function

#1

Hello @itversity Durga Sir,
i was following your tutorial for aggregate dataset using reduce function.
when i use the reduce function it doesnot give the aggregated value at the end and when i try to do a collect() on my RDD it gives the below error.

orderTotal = orderMap.reduce(lambda a, b:a+b)
17/01/01 18:57:19 INFO spark.SparkContext: Starting job: reduce at :1
17/01/01 18:57:19 INFO scheduler.DAGScheduler: Got job 5 (reduce at :1) with 1 output partitions
17/01/01 18:57:19 INFO scheduler.DAGScheduler: Final stage: ResultStage 5(reduce at :1)
17/01/01 18:57:19 INFO scheduler.DAGScheduler: Parents of final stage: List()
17/01/01 18:57:19 INFO scheduler.DAGScheduler: Missing parents: List()
17/01/01 18:57:19 INFO scheduler.DAGScheduler: Submitting ResultStage 5 (PythonRDD[7] at reduce at :1), which has no missing parents
17/01/01 18:57:19 INFO storage.MemoryStore: ensureFreeSpace(5616) called with curMem=155481, maxMem=560497950
17/01/01 18:57:19 INFO storage.MemoryStore: Block broadcast_6 stored as values in memory (estimated size 5.5 KB, free 534.4 MB)
17/01/01 18:57:19 INFO storage.MemoryStore: ensureFreeSpace(3520) called with curMem=161097, maxMem=560497950
17/01/01 18:57:19 INFO storage.MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 3.4 KB, free 534.4 MB)
17/01/01 18:57:19 INFO storage.BlockManagerInfo: Added broadcast_6_piece0 in memory on localhost:55366 (size: 3.4 KB, free: 534.5 MB)
17/01/01 18:57:19 INFO spark.SparkContext: Created broadcast 6 from broadcast at DAGScheduler.scala:861
17/01/01 18:57:19 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 5 (PythonRDD[7] at reduce at :1)
17/01/01 18:57:19 INFO scheduler.TaskSchedulerImpl: Adding task set 5.0 with 1 tasks
17/01/01 18:57:20 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 5.0 (TID 5, localhost, PROCESS_LOCAL, 2153 bytes)
17/01/01 18:57:20 INFO executor.Executor: Running task 0.0 in stage 5.0 (TID 5)
17/01/01 18:57:20 INFO rdd.HadoopRDD: Input split: hdfs://localhost:8020/user/edureka/float.txt:0+40
17/01/01 18:57:20 INFO python.PythonRunner: Times: total = 32, boot = 10, init = 22, finish = 0
17/01/01 18:57:20 INFO executor.Executor: Finished task 0.0 in stage 5.0 (TID 5). 2131 bytes result sent to driver
17/01/01 18:57:20 INFO scheduler.DAGScheduler: ResultStage 5 (reduce at :1) finished in 0.167 s
17/01/01 18:57:20 INFO scheduler.DAGScheduler: Job 5 finished: reduce at :1, took 0.253421 s
17/01/01 18:57:20 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 5.0 (TID 5) in 170 ms on localhost (1/1)
17/01/01 18:57:20 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 5.0, whose tasks have all completed, from pool

orderTotal.collect()
Traceback (most recent call last):
File “”, line 1, in
AttributeError: ‘float’ object has no attribute ‘collect’

please help. thank you.

0 Likes

#3

You can call collect only on an RDD, in the sample code you have executed, you get a return value of float. So you can just print it.

0 Likes

#4

hello Pramod,
orderTotal is a RDD.
orderTotal = orderMap.reduce(lambda a, b:a+b)

orderTotal.collect().

thanks.

0 Likes

#5

Reduce operation is an Action, it is not a transformation, so it will be returning a value not an RDD. http://spark.apache.org/docs/latest/programming-guide.html#basics

0 Likes

#6

ohh , okay got that.
but then when i run the query
orderTotal = orderMap.reduce(lambda a, b:a+b)
and if you check the log, at the end it should display the value (the aggregate value), which is not in my case.
so i tried doing a collect by creating a RDD on the reduce function(orderTotal = orderMap.reduce(lambda a, b:a+b))
but if reduce doesnt display the aggregate value ,then how do i get the value .
(please check my log and all the steps above).

thank you. :slight_smile:

0 Likes

#7

As you are using reduce function, which is an action and not a transformation. So you no need to assign to a variable. Instead you can write directly as

orderMap.reduce(lambda a,b: a+b)
It will give a value. But if you are assigning a variable, you do not need to collect(). You can use variable and press enter like below

orderTotal (press enter)

2 Likes

Aggregating Data (totals)
#8

@N_Chakote ohh, got that. got that.
will try that today and let you know.
thanks much :slight_smile:

0 Likes

#9

that works… thanks for clearing this :slight_smile:

0 Likes