Order and order_items Join RDD hangs in Bigdata labs, is it a known issue?

I tried to execute the following in pyspark shell and also as .py program. But both of them hangs or no-response further after reaching the join().

from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster(“local[2]”).setAppName(“join.py”)
sc = SparkContext(conf=conf)

ordersFile = “/user/email2dgk/review/mysql/data/retail_db/orders”
orderItemsFile = “/user/email2dgk/review/mysql/data/retail_db/order_items”

ordersRdd = sc.textFile(ordersFile).map(lambda x: x.split(","))
ordersKvRdd = ordersRdd.map(lambda x: (str(x[1]), 1))
ordersPairRdd = ordersRdd.map(lambda x: (str(x[1]), int(x[0])))

ordersKvTotalRdd = ordersKvRdd.reduceByKey(lambda x,y: x+y)

orderItemsRdd = sc.textFile(orderItemsFile).map(lambda x: x.split(","))

//sum of subtotal per Order_item_order_id
orderItemsKvRdd = orderItemsRdd.map(lambda x: (int(x[1]), float(x[4])))
orderItemsKvTotalRdd = orderItemsKvRdd.reduceByKey(lambda x,y: x+y)

orderItemsKvTotalRdd.filter(lambda (x,y): x == 2).collect()

ordersKeysRdd = ordersPairRdd.map(lambda (x,y): (y,(y,x)))
orderItemsKeysRdd = orderItemsKvTotalRdd.map(lambda (x,y): (x,(x,y)))
joinedRdd = orderItemsKeysRdd.join(ordersKeysRdd)

for i in joinedRdd.take(5):
print i

sc.stop()

For Your Information, just tried for a particular order_item_order_id i.e 5502 which have 5 records in order_items and one record in orders respectively.

Cluster might be busy at that time. You can go to resource manager and see if there are too many jobs running.

@itversity, experiencing the same issue.

I think in the local mode it is not getting enough resources. Try using yarn.

conf = SparkConf().setMaster(“yarn”).setAppName(“join.py”)

@itversity Let me try this once big-data-labs is up

It is up and running for last 16 hours.

@itversity No, I am trying from 4:00 PM EST it was down and even now also its down. Not sure whether some one already reported.

@itversity Just noticed that bigdata-labs.com redirects into labs.itversity.com so bigdata-labs.com does not exist , is not it?

Yes, eventually every thing will go under *.itversity.com :slight_smile:

@itversity sure, but it would be great if you could have communicated this in banner as well. Not sure whether everyone aware of this.

@itversity By the way, spark-submit --master yarn join.py helped…

However, if I invoke with setMaster(“yarn”) it was failing with the error message.

py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: org.apache.spark.SparkException: Could not parse Master URL: 'yarn’
at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2759)
at org.apache.spark.SparkContext.(SparkContext.scala:522)
at org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:59)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:214)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
at py4j.GatewayConnection.run(GatewayConnection.java:209)

Try yarn-client instead of yarn in setMaster.

@itversity,it helps.

@itversity ,
I’m trying to execute the below steps,
ordersRDD = sc.textFile("/user/kjanakijanu/sqoop_import/orders")
orderItemsRDD = sc.textFile("/user/kjanakijanu/sqoop_import/order_items")

ordersfilterRDD = ordersRDD.
filter(lambda rec: rec.split(",")[3] in “CANCELED”).
map(lambda rec: (int(rec.split(",")[0]), rec))
orderItemsParsedRDD = orderItemsRDD.
map(lambda rec: (int(rec.split(",")[1]), float(rec.split(",")[4])))
orderItemsAgg = orderItemsParsedRDD.
reduceByKey(lambda acc, value: (acc + value))

ordersJoinOrderItems = orderItemsAgg.join(ordersfilterRDD)

for i in ordersJoinOrderItems.
filter(lambda rec: rec[1][0] >= 1000).take(5):
print(i)

When im trying to view data after joining,

for i in ordersJoinOrderItems.take(5) : print(i)
it hangs after some execution.

Can you help me out why it is not executing completely.

Launch spark-shell in yarn mode spark-shell --master yarn-client
By default it is being launched in local mode and not getting enough resources.