Pyspark filter issue

Hi,

I am trying to get the data with the order id 2 from below code. But i am getting error

ordersRDD = sc.textFile("/user/cloudera/sqoop_import/orders")
orderItemsRDD = sc.textFile("/user/cloudera/sqoop_import/order_items")
ordersParsedRDD = ordersRDD.filter(lambda rec: rec.split(",")[3] in “CANCELED”).map(lambda rec: (int(rec.split(",")[0]), rec))
orderItemsParsedRDD = orderItemsRDD.map(lambda rec: (int(rec.split(",")[1]), float(rec.split(",")[4])))
orderItemsAgg = orderItemsParsedRDD.reduceByKey(lambda acc, value: (acc + value))

for i in orderItemsAgg.filter( lambda l : l.split(",")[0] ==2):print(i)

please clarify.

Try
for i in orderItemsAgg.filter( lambda l : l[0] ==2): print(I)

Split function is for string. orderItemsAgg returns [int,float]

It says pipelineRDD is not iterable.

Use following code, it is working

orderItemAggFilter = orderItemAgg.filter(lambda rec: (rec[0] == 2))

for i in orderItemAggFilter.collect: print(i)

Result
(2, 579.98000000000002)

1 Like

Hi @N_Chakote: Its working Thanks. But i am not able to find the reason why its failing for
for i in orderItemsAgg.filter(lambda rec: (rec[0] == 2)).collect: print(i)

why its asking for another variable?

Could you please clarify?

Did you give collect as a function collect()?

I think it should work. The only problem I could think in your initial code was you need to collect the RDD to be able to iterate using for loop.

What error are you getting?

It says pipelineRDD is not iterable.

This one is working for me for i in orderItemsAgg.filter(lambda rec: (rec[0] == 2)).collect(): print(i)