Filter - Join - Not able to print the joined data

Hi,

I am not able to print the joined tuple. Please find the below commands and log file:

ordersRDD = sc.textFile("/user/shubhaprasadsamal/training/sqoop_import/orders")
orderItemsRDD = sc.textFile("/user/shubhaprasadsamal/training/sqoop_import/order_items")

ordersParsedRDD = ordersRDD.filter(lambda rec: rec.split(",")[3] in “CANCELED”).map(lambda rec: (int(rec.split(",")[0]), rec))

orderItemsParsedRDD = orderItemsRDD.map(lambda rec: (int(rec.split(",")[1]), float(rec.split(",")[4])))

orderItemsAgg = orderItemsParsedRDD.reduceByKey(lambda acc, value: (acc + value))

join = orderItemsAgg.join(ordersParsedRDD)
for i in join.take(5):
… print(i)

17/02/06 14:02:23 INFO SparkContext: Starting job: runJob at PythonRDD.scala:393
17/02/06 14:02:23 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 166 bytes
17/02/06 14:02:23 INFO DAGScheduler: Registering RDD 14 (join at :1)
17/02/06 14:02:23 INFO DAGScheduler: Got job 2 (runJob at PythonRDD.scala:393) with 1 output partitions
17/02/06 14:02:23 INFO DAGScheduler: Final stage: ResultStage 5 (runJob at PythonRDD.scala:393)
17/02/06 14:02:23 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 4)
17/02/06 14:02:23 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 4)
17/02/06 14:02:23 INFO DAGScheduler: Submitting ShuffleMapStage 4 (PairwiseRDD[14] at join at :1), which has no missing parents
17/02/06 14:02:23 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 12.8 KB, free 774.4 KB)
17/02/06 14:02:23 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 6.9 KB, free 781.2 KB)
17/02/06 14:02:23 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on localhost:51749 (size: 6.9 KB, free: 511.1 MB)
17/02/06 14:02:23 INFO SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:1008
17/02/06 14:02:23 INFO DAGScheduler: Submitting 8 missing tasks from ShuffleMapStage 4 (PairwiseRDD[14] at join at :1)
17/02/06 14:02:23 INFO TaskSchedulerImpl: Adding task set 4.0 with 8 tasks
17/02/06 14:02:23 INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID 6, localhost, partition 0,NODE_LOCAL, 1992 bytes)
17/02/06 14:02:23 INFO TaskSetManager: Starting task 1.0 in stage 4.0 (TID 7, localhost, partition 1,NODE_LOCAL, 1992 bytes)
17/02/06 14:02:23 INFO TaskSetManager: Starting task 2.0 in stage 4.0 (TID 8, localhost, partition 2,NODE_LOCAL, 1992 bytes)
17/02/06 14:02:23 INFO TaskSetManager: Starting task 3.0 in stage 4.0 (TID 9, localhost, partition 3,NODE_LOCAL, 1992 bytes)
17/02/06 14:02:23 INFO Executor: Running task 0.0 in stage 4.0 (TID 6)
17/02/06 14:02:23 INFO Executor: Running task 1.0 in stage 4.0 (TID 7)
17/02/06 14:02:23 INFO Executor: Running task 2.0 in stage 4.0 (TID 8)
17/02/06 14:02:23 INFO Executor: Running task 3.0 in stage 4.0 (TID 9)
17/02/06 14:02:23 INFO ShuffleBlockFetcherIterator: Getting 4 non-empty blocks out of 4 blocks
17/02/06 14:02:23 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
17/02/06 14:02:23 INFO ShuffleBlockFetcherIterator: Getting 4 non-empty blocks out of 4 blocks
17/02/06 14:02:23 INFO ShuffleBlockFetcherIterator: Getting 4 non-empty blocks out of 4 blocks
17/02/06 14:02:23 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
17/02/06 14:02:23 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
17/02/06 14:02:23 INFO ShuffleBlockFetcherIterator: Getting 4 non-empty blocks out of 4 blocks
17/02/06 14:02:23 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
17/02/06 14:02:23 INFO PythonRunner: Times: total = 48, boot = 3, init = 4, finish = 41
17/02/06 14:02:23 INFO PythonRunner: Times: total = 51, boot = 2, init = 3, finish = 46
17/02/06 14:02:23 INFO PythonRunner: Times: total = 53, boot = 2, init = 2, finish = 49
17/02/06 14:02:23 INFO PythonRunner: Times: total = 63, boot = 3, init = 7, finish = 53
17/02/06 14:02:23 INFO PythonRunner: Times: total = 123, boot = 3, init = 11, finish = 109
17/02/06 14:02:23 INFO PythonRunner: Times: total = 121, boot = 3, init = 11, finish = 107
17/02/06 14:02:23 INFO Executor: Finished task 0.0 in stage 4.0 (TID 6). 1445 bytes result sent to driver
17/02/06 14:02:23 INFO PythonRunner: Times: total = 130, boot = 2, init = 15, finish = 113
17/02/06 14:02:23 INFO Executor: Finished task 1.0 in stage 4.0 (TID 7). 1445 bytes result sent to driver
17/02/06 14:02:23 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 6) in 167 ms on localhost (1/8)
17/02/06 14:02:23 INFO TaskSetManager: Finished task 1.0 in stage 4.0 (TID 7) in 170 ms on localhost (2/8)
17/02/06 14:02:23 INFO Executor: Finished task 3.0 in stage 4.0 (TID 9). 1445 bytes result sent to driver
17/02/06 14:02:23 INFO TaskSetManager: Finished task 3.0 in stage 4.0 (TID 9) in 175 ms on localhost (3/8)
17/02/06 14:02:23 INFO PythonRunner: Times: total = 134, boot = 2, init = 21, finish = 111
17/02/06 14:02:23 INFO Executor: Finished task 2.0 in stage 4.0 (TID 8). 1445 bytes result sent to driver
17/02/06 14:02:23 INFO TaskSetManager: Finished task 2.0 in stage 4.0 (TID 8) in 183 ms on localhost (4/8)

Please suggest.

Thanks,
Shitansu.

@shubhaprasadsamal, Please refer the post Order and order_items Join RDD hangs in Bigdata labs, is it a known issue?

@shubhaprasadsamal - I think you can use “orderItemsParsedRDD.reduceByKey(lambda acc, value: (acc + value))” alone instead of assigning it to any variable. If at all you assign, after assigning, just give the variable name " orderItemsAgg" and press enter button and see if it works.

thanks

@SreeswethaGolla, reduceByKey() would return a RDD not like reduce()

@email2dgk .Thanks for the correction. Sorry I got confused between them.

If you are using the local mode, then due to limited resources it will hang. Run using yarn mode it will run properly.