Pyspark - Reduce function issue

Hi Team,

I am not able to see the total revenue by REDUCE function. Please find the below commands and logs:

order_items = sc.textFile("/user/shubhaprasadsamal/training/sqoop_import/order_items")

17/01/26 18:29:50 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 336.8 KB, free 711.2 KB)
17/01/26 18:29:50 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 28.4 KB, free 739.6 KB)
17/01/26 18:29:50 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:39853 (size: 28.4 KB, free: 511.1 MB)
17/01/26 18:29:50 INFO SparkContext: Created broadcast 2 from textFile at NativeMethodAccessorImpl.java:-2

order_itemsRDD = order_items.map(lambda x: float (x.split(",")[4]))
totalRev = order_itemsRDD.reduce(lambda x,y: x+y)

17/01/26 18:31:21 INFO FileInputFormat: Total input paths to process : 4
17/01/26 18:31:21 INFO SparkContext: Starting job: reduce at :1
17/01/26 18:31:21 INFO DAGScheduler: Got job 1 (reduce at :1) with 4 output partitions
17/01/26 18:31:21 INFO DAGScheduler: Final stage: ResultStage 1 (reduce at :1)
17/01/26 18:31:21 INFO DAGScheduler: Parents of final stage: List()
17/01/26 18:31:21 INFO DAGScheduler: Missing parents: List()
17/01/26 18:31:21 INFO DAGScheduler: Submitting ResultStage 1 (PythonRDD[5] at reduce at :1), which has no missing parents
17/01/26 18:31:21 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 5.7 KB, free 745.3 KB)
17/01/26 18:31:21 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 3.6 KB, free 748.8 KB)
17/01/26 18:31:21 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:39853 (size: 3.6 KB, free: 511.1 MB)
17/01/26 18:31:21 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1008
17/01/26 18:31:21 INFO DAGScheduler: Submitting 4 missing tasks from ResultStage 1 (PythonRDD[5] at reduce at :1)
17/01/26 18:31:21 INFO TaskSchedulerImpl: Adding task set 1.0 with 4 tasks
17/01/26 18:31:21 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 4, localhost, partition 0,ANY, 2202 bytes)
17/01/26 18:31:21 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 5, localhost, partition 1,ANY, 2202 bytes)
17/01/26 18:31:21 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 6, localhost, partition 2,ANY, 2202 bytes)
17/01/26 18:31:21 INFO TaskSetManager: Starting task 3.0 in stage 1.0 (TID 7, localhost, partition 3,ANY, 2202 bytes)
17/01/26 18:31:21 INFO Executor: Running task 0.0 in stage 1.0 (TID 4)
17/01/26 18:31:21 INFO Executor: Running task 1.0 in stage 1.0 (TID 5)
17/01/26 18:31:21 INFO Executor: Running task 2.0 in stage 1.0 (TID 6)
17/01/26 18:31:21 INFO Executor: Running task 3.0 in stage 1.0 (TID 7)
17/01/26 18:31:21 INFO HadoopRDD: Input split: hdfs://nn01.itversity.com:8020/user/shubhaprasadsamal/training/sqoop_import/order_items/part-m-00000:0+1303818
17/01/26 18:31:21 INFO HadoopRDD: Input split: hdfs://nn01.itversity.com:8020/user/shubhaprasadsamal/training/sqoop_import/order_items/part-m-00001:0+1343222
17/01/26 18:31:21 INFO HadoopRDD: Input split: hdfs://nn01.itversity.com:8020/user/shubhaprasadsamal/training/sqoop_import/order_items/part-m-00002:0+1371917
17/01/26 18:31:21 INFO HadoopRDD: Input split: hdfs://nn01.itversity.com:8020/user/shubhaprasadsamal/training/sqoop_import/order_items/part-m-00003:0+1389923
17/01/26 18:31:22 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:39853 in memory (size: 3.5 KB, free: 511.1 MB)
17/01/26 18:31:22 INFO ContextCleaner: Cleaned accumulator 2
17/01/26 18:31:22 INFO PythonRunner: Times: total = 234, boot = 4, init = 20, finish = 210
17/01/26 18:31:22 INFO Executor: Finished task 1.0 in stage 1.0 (TID 5). 2186 bytes result sent to driver
17/01/26 18:31:22 INFO PythonRunner: Times: total = 239, boot = 3, init = 33, finish = 203
17/01/26 18:31:22 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 5) in 249 ms on localhost (1/4)
17/01/26 18:31:22 INFO Executor: Finished task 0.0 in stage 1.0 (TID 4). 2186 bytes result sent to driver
17/01/26 18:31:22 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 4) in 257 ms on localhost (2/4)
17/01/26 18:31:22 INFO PythonRunner: Times: total = 326, boot = 2, init = 25, finish = 299
17/01/26 18:31:22 INFO Executor: Finished task 2.0 in stage 1.0 (TID 6). 2186 bytes result sent to driver
17/01/26 18:31:22 INFO TaskSetManager: Finished task 2.0 in stage 1.0 (TID 6) in 342 ms on localhost (3/4)
17/01/26 18:31:22 INFO PythonRunner: Times: total = 331, boot = 6, init = 17, finish = 308
17/01/26 18:31:22 INFO Executor: Finished task 3.0 in stage 1.0 (TID 7). 2186 bytes result sent to driver
17/01/26 18:31:22 INFO TaskSetManager: Finished task 3.0 in stage 1.0 (TID 7) in 346 ms on localhost (4/4)
17/01/26 18:31:22 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
17/01/26 18:31:22 INFO DAGScheduler: ResultStage 1 (reduce at :1) finished in 0.348 s
17/01/26 18:31:22 INFO DAGScheduler: Job 1 finished: reduce at :1, took 0.358581 s

Even same issue facing while trying to find the max(revenue) by using reduce().

Please suggest. Thanks,

Shitansu.

@shubhaprasadsamal if the terminal is freezing then you can run pyspark in yarn-client mode

@shubhaprasadsamal Did you try just typing totalRev and enter in CLI ? that should display the value in that.

@shubhaprasadsamal : yes , since it is an action . type totalRev and press enter and you will get the result .

Thank you Ran and Avinash.

Issue is resolved.