Pyspark - Filtering data

I am trying to get the a range of records (with order_customer_id == 5 and order_customer_id ==100) from orders table using ‘filter’ function. Command is getting executed but output does not come up. Also i have tried using <= and >= conditions but no luck.
Please help if i am doing wrong.

(index 2 is order_customer_id) and

expected output

±---------±--------------------±------------------±----------------+
| order_id | order_date | order_customer_id | order_status |
±---------±--------------------±------------------±----------------+
| 36472 | 2014-03-06 00:00:00 | 5 | PROCESSING |
| 13705 | 2013-10-18 00:00:00 | 5 | COMPLETE |
| 45832 | 2014-05-05 00:00:00 | 5 | PENDING_PAYMENT |
| 41333 | 2014-04-05 00:00:00 | 5 | COMPLETE |
| 62907 | 2014-02-06 00:00:00 | 100 | PENDING_PAYMENT |
| 54995 | 2014-07-08 00:00:00 | 100 | COMPLETE |
| 6641 | 2013-09-05 00:00:00 | 100 | COMPLETE |
| 28477 | 2014-01-16 00:00:00 | 100 | COMPLETE |
| 22395 | 2013-12-09 00:00:00 | 100 | CANCELED |
| 15045 | 2013-10-28 00:00:00 | 100 | PROCESSING |
| 64426 | 2014-04-06 00:00:00 | 100 | PENDING |
±---------±--------------------±------------------±----------------+

Command:

for i in ordersRDD.filter(lambda rec:
… int(rec.split(",")[2]) == 5
… and int(rec.split(",")[2]) == 100).take(3):
… print(i)

Output:

17/02/15 12:53:36 INFO spark.SparkContext: Starting job: runJob at PythonRDD.scala:393
17/02/15 12:53:36 INFO scheduler.DAGScheduler: Got job 60 (runJob at PythonRDD.scala:393) with 1 output partitions
17/02/15 12:53:36 INFO scheduler.DAGScheduler: Final stage: ResultStage 86 (runJob at PythonRDD.scala:393)
17/02/15 12:53:36 INFO scheduler.DAGScheduler: Parents of final stage: List()
17/02/15 12:53:36 INFO scheduler.DAGScheduler: Missing parents: List()
17/02/15 12:53:36 INFO scheduler.DAGScheduler: Submitting ResultStage 86 (PythonRDD[97] at RDD at PythonRDD.scala:43), which has no missing parents
17/02/15 12:53:36 INFO storage.MemoryStore: Block broadcast_70 stored as values in memory (estimated size 5.3 KB, free 721.2 KB)
17/02/15 12:53:36 INFO storage.MemoryStore: Block broadcast_70_piece0 stored as bytes in memory (estimated size 3.4 KB, free 724.6 KB)
17/02/15 12:53:36 INFO storage.BlockManagerInfo: Added broadcast_70_piece0 in memory on 192.168.142.128:43225 (size: 3.4 KB, free: 530.2 MB)
17/02/15 12:53:36 INFO spark.SparkContext: Created broadcast 70 from broadcast at DAGScheduler.scala:1006
17/02/15 12:53:36 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 86 (PythonRDD[97] at RDD at PythonRDD.scala:43)
17/02/15 12:53:36 INFO cluster.YarnScheduler: Adding task set 86.0 with 1 tasks
17/02/15 12:53:36 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 86.0 (TID 202, quickstart.cloudera, partition 0,NODE_LOCAL, 2180 bytes)
17/02/15 12:53:36 INFO storage.BlockManagerInfo: Added broadcast_70_piece0 in memory on quickstart.cloudera:52587 (size: 3.4 KB, free: 530.2 MB)
17/02/15 12:53:37 INFO scheduler.DAGScheduler: ResultStage 86 (runJob at PythonRDD.scala:393) finished in 0.152 s
17/02/15 12:53:37 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 86.0 (TID 202) in 151 ms on quickstart.cloudera (1/1)
17/02/15 12:53:37 INFO scheduler.DAGScheduler: Job 60 finished: runJob at PythonRDD.scala:393, took 0.162028 s
17/02/15 12:53:37 INFO cluster.YarnScheduler: Removed TaskSet 86.0, whose tasks have all completed, from pool
17/02/15 12:53:37 INFO spark.SparkContext: Starting job: runJob at PythonRDD.scala:393
17/02/15 12:53:37 INFO scheduler.DAGScheduler: Got job 61 (runJob at PythonRDD.scala:393) with 3 output partitions
17/02/15 12:53:37 INFO scheduler.DAGScheduler: Final stage: ResultStage 87 (runJob at PythonRDD.scala:393)
17/02/15 12:53:37 INFO scheduler.DAGScheduler: Parents of final stage: List()
17/02/15 12:53:37 INFO scheduler.DAGScheduler: Missing parents: List()
17/02/15 12:53:37 INFO scheduler.DAGScheduler: Submitting ResultStage 87 (PythonRDD[98] at RDD at PythonRDD.scala:43), which has no missing parents
17/02/15 12:53:37 INFO storage.MemoryStore: Block broadcast_71 stored as values in memory (estimated size 5.3 KB, free 729.9 KB)
17/02/15 12:53:37 INFO storage.MemoryStore: Block broadcast_71_piece0 stored as bytes in memory (estimated size 3.4 KB, free 733.3 KB)
17/02/15 12:53:37 INFO storage.BlockManagerInfo: Added broadcast_71_piece0 in memory on 192.168.142.128:43225 (size: 3.4 KB, free: 530.2 MB)
17/02/15 12:53:37 INFO spark.SparkContext: Created broadcast 71 from broadcast at DAGScheduler.scala:1006
17/02/15 12:53:37 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from ResultStage 87 (PythonRDD[98] at RDD at PythonRDD.scala:43)
17/02/15 12:53:37 INFO cluster.YarnScheduler: Adding task set 87.0 with 3 tasks
17/02/15 12:53:37 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 87.0 (TID 203, quickstart.cloudera, partition 1,NODE_LOCAL, 2180 bytes)
17/02/15 12:53:37 INFO storage.BlockManagerInfo: Added broadcast_71_piece0 in memory on quickstart.cloudera:52587 (size: 3.4 KB, free: 530.2 MB)
17/02/15 12:53:37 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 87.0 (TID 204, quickstart.cloudera, partition 2,NODE_LOCAL, 2180 bytes)
17/02/15 12:53:37 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 87.0 (TID 203) in 114 ms on quickstart.cloudera (1/3)
17/02/15 12:53:37 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 87.0 (TID 205, quickstart.cloudera, partition 3,NODE_LOCAL, 2180 bytes)
17/02/15 12:53:37 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 87.0 (TID 204) in 110 ms on quickstart.cloudera (2/3)
17/02/15 12:53:37 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 87.0 (TID 205) in 101 ms on quickstart.cloudera (3/3)
17/02/15 12:53:37 INFO cluster.YarnScheduler: Removed TaskSet 87.0, whose tasks have all completed, from pool
17/02/15 12:53:37 INFO scheduler.DAGScheduler: ResultStage 87 (runJob at PythonRDD.scala:393) finished in 0.313 s
17/02/15 12:53:37 INFO scheduler.DAGScheduler: Job 61 finished: runJob at PythonRDD.scala:393, took 0.321881 s

Thanks in advance,
Swetha.

@SreeswethaGolla - You have to use “or” condition

orderRDD = sc.textFile("/user/gnanaprakasam/sqoop_import/orders")
orderFilterRDD = orderRDD.filter(lambda rec: int(rec.split(",")[2]) == 5 or int(rec.split(",")[2]) == 100)
for i in orderFilterRDD.take(3): print(i)

@gnanaprakasam, it worked. Thanks a lot.:slight_smile:

Hi All,

Tried executing below join querry in itversity-lab but its already 10 cursor is not moving anyway…

ordersJoinOrderItems=orderItemsParsedRDD.join(orderItemsRDD)
17/02/20 00:27:35 INFO BlockManagerInfo: Removed broadcast_5_piece0 on localhost:54463 in memory (size: 3.1 KB, free: 511.0 MB)
17/02/20 00:27:35 INFO ContextCleaner: Cleaned accumulator 3
17/02/20 00:27:35 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:54463 in memory (size: 3.1 KB, free: 511.0 MB)
17/02/20 00:27:35 INFO ContextCleaner: Cleaned accumulator 2

Ther’s a typo error above…
Its already 10 minutes but no responce…from the current cursor…