Job Stuck while performing a join

Pyspark

Code:
###############################################################################
ordersRDD = sc.textFile("/user/cloudera/sqoop_import/orders")

orderItemsRDD = sc.textFile("/user/cloudera/sqoop_import/order_items")

ordersParsedRDD = ordersRDD.filter(lambda rec: rec.split(",")[3] in “CANCELED”).map(lambda rec: (int(rec.split(",")[0]), rec))

orderItemsParsedRDD = orderItemsRDD.map(lambda rec: (int(rec.split(",")[1]), float(rec.split(",")[4])))

orderItemsAgg = orderItemsParsedRDD.reduceByKey(lambda acc, value: (acc + value))

ordersJoinOrderItems = orderItemsAgg.join(ordersParsedRDD)

for i in ordersJoinOrderItems.filter(lambda rec: rec[1][0] >= 1000).take(5): print(i)
################################################################################

Logs Generated in PySpark:
17/05/16 21:34:37 INFO spark.SparkContext: Starting job: runJob at PythonRDD.scala:393
17/05/16 21:34:37 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 86 bytes
17/05/16 21:34:37 INFO scheduler.DAGScheduler: Got job 2 (runJob at PythonRDD.scala:393) with 1 output partitions
17/05/16 21:34:37 INFO scheduler.DAGScheduler: Final stage: ResultStage 7 (runJob at PythonRDD.scala:393)
17/05/16 21:34:37 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 6)
17/05/16 21:34:37 INFO scheduler.DAGScheduler: Missing parents: List(ShuffleMapStage 6)
17/05/16 21:34:37 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 6 (PairwiseRDD[13] at join at :1), which has no missing parents
17/05/16 21:34:37 INFO storage.MemoryStore: Block broadcast_5 stored as values in memory (estimated size 12.3 KB, free 330.9 KB)
17/05/16 21:34:37 INFO storage.MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 6.8 KB, free 337.7 KB)
17/05/16 21:34:37 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on localhost:60299 (size: 6.8 KB, free: 534.5 MB)
17/05/16 21:34:37 INFO spark.SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:1006
17/05/16 21:34:37 INFO scheduler.DAGScheduler: Submitting 8 missing tasks from ShuffleMapStage 6 (PairwiseRDD[13] at join at :1)
17/05/16 21:34:37 INFO scheduler.TaskSchedulerImpl: Adding task set 6.0 with 8 tasks
17/05/16 21:34:37 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 6.0 (TID 9, localhost, partition 0,NODE_LOCAL, 1992 bytes)
17/05/16 21:34:37 INFO executor.Executor: Running task 0.0 in stage 6.0 (TID 9)
17/05/16 21:34:37 INFO storage.ShuffleBlockFetcherIterator: Getting 4 non-empty blocks out of 4 blocks
17/05/16 21:34:37 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
17/05/16 21:34:38 INFO python.PythonRunner: Times: total = 99, boot = 4, init = 13, finish = 82
17/05/16 21:34:38 INFO python.PythonRunner: Times: total = 208, boot = 7, init = 21, finish = 180
17/05/16 21:34:38 INFO executor.Executor: Finished task 0.0 in stage 6.0 (TID 9). 1445 bytes result sent to driver
17/05/16 21:34:38 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 6.0 (TID 10, localhost, partition 1,NODE_LOCAL, 1992 bytes)
17/05/16 21:34:38 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 6.0 (TID 9) in 345 ms on localhost (1/8)
17/05/16 21:34:38 INFO executor.Executor: Running task 1.0 in stage 6.0 (TID 10)
17/05/16 21:34:38 INFO storage.ShuffleBlockFetcherIterator: Getting 4 non-empty blocks out of 4 blocks
17/05/16 21:34:38 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
17/05/16 21:34:38 INFO python.PythonRunner: Times: total = 68, boot = -232, init = 235, finish = 65
17/05/16 21:34:38 INFO python.PythonRunner: Times: total = 151, boot = -111, init = 141, finish = 121
17/05/16 21:34:38 INFO executor.Executor: Finished task 1.0 in stage 6.0 (TID 10). 1445 bytes result sent to driver
17/05/16 21:34:38 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 6.0 (TID 11, localhost, partition 2,NODE_LOCAL, 1992 bytes)
17/05/16 21:34:38 INFO executor.Executor: Running task 2.0 in stage 6.0 (TID 11)
17/05/16 21:34:38 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 6.0 (TID 10) in 243 ms on localhost (2/8)
17/05/16 21:34:38 INFO storage.ShuffleBlockFetcherIterator: Getting 4 non-empty blocks out of 4 blocks
17/05/16 21:34:38 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 6 ms
17/05/16 21:34:38 INFO python.PythonRunner: Times: total = 71, boot = -134, init = 140, finish = 65
17/05/16 21:34:38 INFO python.PythonRunner: Times: total = 150, boot = -47, init = 68, finish = 129
17/05/16 21:34:38 INFO executor.Executor: Finished task 2.0 in stage 6.0 (TID 11). 1445 bytes result sent to driver
17/05/16 21:34:38 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 6.0 (TID 12, localhost, partition 3,NODE_LOCAL, 1992 bytes)
17/05/16 21:34:38 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 6.0 (TID 11) in 233 ms on localhost (3/8)
17/05/16 21:34:38 INFO executor.Executor: Running task 3.0 in stage 6.0 (TID 12)
17/05/16 21:34:38 INFO storage.ShuffleBlockFetcherIterator: Getting 4 non-empty blocks out of 4 blocks
17/05/16 21:34:38 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
17/05/16 21:34:38 INFO python.PythonRunner: Times: total = 69, boot = -143, init = 148, finish = 64
17/05/16 21:34:38 INFO python.PythonRunner: Times: total = 141, boot = -50, init = 74, finish = 117
17/05/16 21:34:38 INFO executor.Executor: Finished task 3.0 in stage 6.0 (TID 12). 1445 bytes result sent to driver
17/05/16 21:34:38 INFO scheduler.TaskSetManager: Finished task 3.0 in stage 6.0 (TID 12) in 221 ms on localhost (4/8)
####################################################################################

I tried normal for loop without the filter, tried restarting but nothing works. Please help.

Thanks and Regards,
Udit Arora

This is quite common in virtual machines due to lack of enough resources. You can give a try by running it in YARN mode. But it is not guarantee that it will work.

I tried running in Yarn mode and it got solved. If we simply run pyspark on the command line is it running in local mode or yarn mode cause if we need to run in local mode we have to enter: pyspark --master local