Pyspark filter issue

pyspark
#1

Hi,

I am trying to get the data with the order id 2 from below code. But i am getting error

ordersRDD = sc.textFile("/user/cloudera/sqoop_import/orders")
orderItemsRDD = sc.textFile("/user/cloudera/sqoop_import/order_items")
ordersParsedRDD = ordersRDD.filter(lambda rec: rec.split(",")[3] in “CANCELED”).map(lambda rec: (int(rec.split(",")[0]), rec))
orderItemsParsedRDD = orderItemsRDD.map(lambda rec: (int(rec.split(",")[1]), float(rec.split(",")[4])))
orderItemsAgg = orderItemsParsedRDD.reduceByKey(lambda acc, value: (acc + value))

for i in orderItemsAgg.filter( lambda l : l.split(",")[0] ==2):print(i)

please clarify.

0 Likes

#2

Try
for i in orderItemsAgg.filter( lambda l : l[0] ==2): print(I)

Split function is for string. orderItemsAgg returns [int,float]

0 Likes

#3

It says pipelineRDD is not iterable.

0 Likes

#4

Use following code, it is working

orderItemAggFilter = orderItemAgg.filter(lambda rec: (rec[0] == 2))

for i in orderItemAggFilter.collect: print(i)

Result
(2, 579.98000000000002)

1 Like

#5

Hi @N_Chakote: Its working Thanks. But i am not able to find the reason why its failing for
for i in orderItemsAgg.filter(lambda rec: (rec[0] == 2)).collect: print(i)

why its asking for another variable?

Could you please clarify?

0 Likes

#6

Did you give collect as a function collect()?

I think it should work. The only problem I could think in your initial code was you need to collect the RDD to be able to iterate using for loop.

0 Likes

#7

What error are you getting?

0 Likes

#8

It says pipelineRDD is not iterable.

0 Likes

#9

This one is working for me for i in orderItemsAgg.filter(lambda rec: (rec[0] == 2)).collect(): print(i)

0 Likes