sortByKey for spark rdd fails


#1

Hello Team,

While going through one of the demo videos on sortByKey() for composite keys using the labs and the available ‘/public/retail_db/products’ data using the below code snippet;

products=sc.textFile('/public/retail_db/products')
products_map=products.filter(lambda p: p.split(',')!='').map(lambda p:((int(p.split(',')[1]),float(p.split(',')[4])),p))

#Mapping works and output is:

products_map.first()
((2, 59.98), u'1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.98,http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy')

Even, the data type of the composites keys are OK;

>>> type(products_map.first()[0][0])

18/03/22 15:09:06 INFO SparkContext: Starting job: runJob at PythonRDD.scala:393
18/03/22 15:09:06 INFO DAGScheduler: Got job 10 (runJob at PythonRDD.scala:393) with 1 output partitions
18/03/22 15:09:06 INFO DAGScheduler: Final stage: ResultStage 10 (runJob at PythonRDD.scala:393)
18/03/22 15:09:06 INFO DAGScheduler: Parents of final stage: List()
18/03/22 15:09:06 INFO DAGScheduler: Missing parents: List()
18/03/22 15:09:06 INFO DAGScheduler: Submitting ResultStage 10 (PythonRDD[12] at RDD at PythonRDD.scala:43), which has no missing parents
18/03/22 15:09:06 INFO MemoryStore: Block broadcast_11 stored as values in memory (estimated size 5.7 KB, free 466.6 KB)
18/03/22 15:09:06 INFO MemoryStore: Block broadcast_11_piece0 stored as bytes in memory (estimated size 3.6 KB, free 470.2 KB)
18/03/22 15:09:06 INFO BlockManagerInfo: Added broadcast_11_piece0 in memory on 172.16.1.113:34493 (size: 3.6 KB, free: 511.1 MB)
18/03/22 15:09:06 INFO SparkContext: Created broadcast 11 from broadcast at DAGScheduler.scala:1008
18/03/22 15:09:06 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 10 (PythonRDD[12] at RDD at PythonRDD.scala:43)
18/03/22 15:09:06 INFO YarnScheduler: Adding task set 10.0 with 1 tasks
18/03/22 15:09:06 INFO TaskSetManager: Starting task 0.0 in stage 10.0 (TID 18, wn02.itversity.com, partition 0,NODE_LOCAL, 2169 bytes)
18/03/22 15:09:06 INFO BlockManagerInfo: Added broadcast_11_piece0 in memory on wn02.itversity.com:60685 (size: 3.6 KB, free: 511.1 MB)
18/03/22 15:09:06 INFO TaskSetManager: Finished task 0.0 in stage 10.0 (TID 18) in 47 ms on wn02.itversity.com (1/1)
18/03/22 15:09:06 INFO YarnScheduler: Removed TaskSet 10.0, whose tasks have all completed, from pool
18/03/22 15:09:06 INFO DAGScheduler: ResultStage 10 (runJob at PythonRDD.scala:393) finished in 0.047 s
18/03/22 15:09:06 INFO DAGScheduler: Job 10 finished: runJob at PythonRDD.scala:393, took 0.054609 s
<type ‘int’>

type(products_map.first()[0][1])
18/03/22 15:09:15 INFO SparkContext: Starting job: runJob at PythonRDD.scala:393
18/03/22 15:09:15 INFO DAGScheduler: Got job 11 (runJob at PythonRDD.scala:393) with 1 output partitions
18/03/22 15:09:15 INFO DAGScheduler: Final stage: ResultStage 11 (runJob at PythonRDD.scala:393)
18/03/22 15:09:15 INFO DAGScheduler: Parents of final stage: List()
18/03/22 15:09:15 INFO DAGScheduler: Missing parents: List()
18/03/22 15:09:15 INFO DAGScheduler: Submitting ResultStage 11 (PythonRDD[13] at RDD at PythonRDD.scala:43), which has no missing parents
18/03/22 15:09:15 INFO MemoryStore: Block broadcast_12 stored as values in memory (estimated size 5.7 KB, free 475.9 KB)
18/03/22 15:09:15 INFO MemoryStore: Block broadcast_12_piece0 stored as bytes in memory (estimated size 3.6 KB, free 479.6 KB)
18/03/22 15:09:15 INFO BlockManagerInfo: Added broadcast_12_piece0 in memory on 172.16.1.113:34493 (size: 3.6 KB, free: 511.1 MB)
18/03/22 15:09:15 INFO SparkContext: Created broadcast 12 from broadcast at DAGScheduler.scala:1008
18/03/22 15:09:15 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 11 (PythonRDD[13] at RDD at PythonRDD.scala:43)
18/03/22 15:09:15 INFO YarnScheduler: Adding task set 11.0 with 1 tasks
18/03/22 15:09:15 INFO TaskSetManager: Starting task 0.0 in stage 11.0 (TID 19, wn02.itversity.com, partition 0,NODE_LOCAL, 2169 bytes)
18/03/22 15:09:15 INFO BlockManagerInfo: Added broadcast_12_piece0 in memory on wn02.itversity.com:60685 (size: 3.6 KB, free: 511.1 MB)
18/03/22 15:09:15 INFO TaskSetManager: Finished task 0.0 in stage 11.0 (TID 19) in 36 ms on wn02.itversity.com (1/1)
18/03/22 15:09:15 INFO YarnScheduler: Removed TaskSet 11.0, whose tasks have all completed, from pool
18/03/22 15:09:15 INFO DAGScheduler: ResultStage 11 (runJob at PythonRDD.scala:393) finished in 0.037 s
18/03/22 15:09:15 INFO DAGScheduler: Job 11 finished: runJob at PythonRDD.scala:393, took 0.041716 s
<type ‘float’>

But, upon running the below code, it fails with error,

Code:
products_map.sortByKey().take(10)

Error:

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):

File “/usr/hdp/2.5.0.0-1245/spark/python/pyspark/worker.py”, line 111, in main
process()
File “/usr/hdp/2.5.0.0-1245/spark/python/pyspark/worker.py”, line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File “/usr/hdp/2.5.0.0-1245/spark/python/pyspark/rdd.py”, line 2346, in pipeline_func
return func(split, prev_func(split, iterator))
File “/usr/hdp/2.5.0.0-1245/spark/python/pyspark/rdd.py”, line 2346, in pipeline_func
return func(split, prev_func(split, iterator))
File “/usr/hdp/2.5.0.0-1245/spark/python/pyspark/rdd.py”, line 2346, in pipeline_func
return func(split, prev_func(split, iterator))
File “/usr/hdp/2.5.0.0-1245/spark/python/pyspark/rdd.py”, line 317, in func
return f(iterator)
File “/usr/hdp/2.5.0.0-1245/spark/python/pyspark/rdd.py”, line 1004, in
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File “/usr/hdp/2.5.0.0-1245/spark/python/pyspark/rdd.py”, line 1004, in
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File “”, line 1, in
ValueError: could not convert string to float:

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more

Please suggest.


#2

Hello team,

Any update?

Regards


#3

There are some bad records in “public/retail_db/products”.

Try this,

products_map=products.filter(lambda p: p.split(’,’)[4]!="").map(lambda p:((int(p.split(’,’)[1]),float(p.split(’,’)[4])),p))


#4

But, why is the filter on position 4 only?

The data after applying the filter and the sort function still shows the blank space:

((2, 29.97), u"18,2,Reebok Men's Full Zip Training Jacket,,29.97,http://images.acmesports.sports/Reebok+Men%27s+Full+Zip+Training+Jacket")

Can you please explain the logic?


#5

The blank space is on position 3 starting from 0


#6

Hello team,

Please update?

Regards


#7

i would suggest to try to import the products table again but with sqoop using the ClouderVM.


#8

There is one bad record in the file. It will be like this
((2, 29.97), u"18,2,Reebok Men’s Full Zip, Training Jacket,29.97,http://images.acmesports.sports/Reebok+Men’s+Full+Zip+Training+Jacket")

So here it is after 4th comma it is empty. So u cant perform operations on key if it is null.
It is an issue in the data.


#9

I can see that. But, my question is,shouldn’t the filter logic be (lambda p: p.split(’,’)[3]!="") if the blank appears after the 4th comma?

Regards