Pyspark reduce(Lambda..) Error

pyspark
mapreduce

#1

pyspark --master yarn-client
or even standalone
–everything works until i try the reduce(lambda x: x+y) and there is some strange error:

temp_c = [10, 3, -5, 25, 1, 9, 29, -10, 5]
rdd_temp_c = sc.parallelize(temp_c)
rdd_temp_K = rdd_temp_c.map(lambda x: x + 273.15).collect()
print(rdd_temp_K)
rdd_temp_k2 = rdd_temp_c.map(lambda x: x + 273.15)
rdd_temp_k2.reduce(lambda x:x+y)

rdd_temp_k2.reduce(lambda x:x+y)
17/12/30 18:38:08 INFO SparkContext: Starting job: reduce at :1
17/12/30 18:38:08 INFO DAGScheduler: Got job 11 (reduce at :1) with 8 output partitions
17/12/30 18:38:08 INFO DAGScheduler: Final stage: ResultStage 11 (reduce at :1)
17/12/30 18:38:08 INFO DAGScheduler: Parents of final stage: List()
17/12/30 18:38:08 INFO DAGScheduler: Missing parents: List()
17/12/30 18:38:08 INFO DAGScheduler: Submitting ResultStage 11 (PythonRDD[16] at reduce at :1), which has no missing parents
17/12/30 18:38:08 INFO MemoryStore: Block broadcast_13 stored as values in memory (estimated size 4.0 KB, free 750.7 KB)
17/12/30 18:38:08 INFO MemoryStore: Block broadcast_13_piece0 stored as bytes in memory (estimated size 2.7 KB, free 753.4 KB)
17/12/30 18:38:08 INFO BlockManagerInfo: Added broadcast_13_piece0 in memory on localhost:39859 (size: 2.7 KB, free: 511.1 MB)
17/12/30 18:38:08 INFO SparkContext: Created broadcast 13 from broadcast at DAGScheduler.scala:1008
17/12/30 18:38:08 INFO DAGScheduler: Submitting 8 missing tasks from ResultStage 11 (PythonRDD[16] at reduce at :1)
17/12/30 18:38:08 INFO TaskSchedulerImpl: Adding task set 11.0 with 8 tasks
17/12/30 18:38:08 INFO TaskSetManager: Starting task 0.0 in stage 11.0 (TID 29, localhost, partition 0,PROCESS_LOCAL, 2083 bytes)
17/12/30 18:38:08 INFO TaskSetManager: Starting task 1.0 in stage 11.0 (TID 30, localhost, partition 1,PROCESS_LOCAL, 2083 bytes)
17/12/30 18:38:08 INFO TaskSetManager: Starting task 2.0 in stage 11.0 (TID 31, localhost, partition 2,PROCESS_LOCAL, 2086 bytes)
17/12/30 18:38:08 INFO TaskSetManager: Starting task 3.0 in stage 11.0 (TID 32, localhost, partition 3,PROCESS_LOCAL, 2083 bytes)
17/12/30 18:38:08 INFO TaskSetManager: Starting task 4.0 in stage 11.0 (TID 33, localhost, partition 4,PROCESS_LOCAL, 2083 bytes)
17/12/30 18:38:08 INFO TaskSetManager: Starting task 5.0 in stage 11.0 (TID 34, localhost, partition 5,PROCESS_LOCAL, 2083 bytes)
17/12/30 18:38:08 INFO TaskSetManager: Starting task 6.0 in stage 11.0 (TID 35, localhost, partition 6,PROCESS_LOCAL, 2083 bytes)
17/12/30 18:38:08 INFO TaskSetManager: Starting task 7.0 in stage 11.0 (TID 36, localhost, partition 7,PROCESS_LOCAL, 2105 bytes)
17/12/30 18:38:08 INFO Executor: Running task 2.0 in stage 11.0 (TID 31)
17/12/30 18:38:08 INFO Executor: Running task 5.0 in stage 11.0 (TID 34)
17/12/30 18:38:08 INFO Executor: Running task 1.0 in stage 11.0 (TID 30)
17/12/30 18:38:08 INFO Executor: Running task 0.0 in stage 11.0 (TID 29)
17/12/30 18:38:08 INFO Executor: Running task 6.0 in stage 11.0 (TID 35)
17/12/30 18:38:08 INFO Executor: Running task 3.0 in stage 11.0 (TID 32)
17/12/30 18:38:08 INFO Executor: Running task 7.0 in stage 11.0 (TID 36)
17/12/30 18:38:08 INFO Executor: Running task 4.0 in stage 11.0 (TID 33)
17/12/30 18:38:08 INFO PythonRunner: Times: total = 11, boot = 2, init = 9, finish = 0
17/12/30 18:38:08 INFO PythonRunner: Times: total = 11, boot = 4, init = 7, finish = 0
17/12/30 18:38:08 INFO Executor: Finished task 4.0 in stage 11.0 (TID 33). 1002 bytes result sent to driver
17/12/30 18:38:08 INFO Executor: Finished task 3.0 in stage 11.0 (TID 32). 1002 bytes result sent to driver
17/12/30 18:38:08 INFO TaskSetManager: Finished task 4.0 in stage 11.0 (TID 33) in 25 ms on localhost (1/8)
17/12/30 18:38:08 INFO TaskSetManager: Finished task 3.0 in stage 11.0 (TID 32) in 26 ms on localhost (2/8)
17/12/30 18:38:08 INFO PythonRunner: Times: total = 45, boot = -16285, init = 16330, finish = 0
17/12/30 18:38:08 INFO PythonRunner: Times: total = 45, boot = -16301, init = 16346, finish = 0
17/12/30 18:38:08 INFO Executor: Finished task 5.0 in stage 11.0 (TID 34). 1002 bytes result sent to driver
17/12/30 18:38:08 INFO Executor: Finished task 0.0 in stage 11.0 (TID 29). 1002 bytes result sent to driver
17/12/30 18:38:08 INFO PythonRunner: Times: total = 45, boot = -16273, init = 16318, finish = 0
17/12/30 18:38:08 INFO Executor: Finished task 2.0 in stage 11.0 (TID 31). 1002 bytes result sent to driver
17/12/30 18:38:08 INFO TaskSetManager: Finished task 5.0 in stage 11.0 (TID 34) in 53 ms on localhost (3/8)
17/12/30 18:38:08 INFO TaskSetManager: Finished task 0.0 in stage 11.0 (TID 29) in 55 ms on localhost (4/8)
17/12/30 18:38:08 INFO TaskSetManager: Finished task 2.0 in stage 11.0 (TID 31) in 55 ms on localhost (5/8)
17/12/30 18:38:08 INFO PythonRunner: Times: total = 48, boot = -16258, init = 16306, finish = 0
17/12/30 18:38:08 INFO Executor: Finished task 6.0 in stage 11.0 (TID 35). 1002 bytes result sent to driver
17/12/30 18:38:08 INFO PythonRunner: Times: total = 48, boot = -16266, init = 16314, finish = 0
17/12/30 18:38:08 INFO Executor: Finished task 1.0 in stage 11.0 (TID 30). 1002 bytes result sent to driver
17/12/30 18:38:08 INFO TaskSetManager: Finished task 6.0 in stage 11.0 (TID 35) in 61 ms on localhost (6/8)
17/12/30 18:38:08 INFO TaskSetManager: Finished task 1.0 in stage 11.0 (TID 30) in 62 ms on localhost (7/8)
17/12/30 18:38:08 ERROR Executor: Exception in task 7.0 in stage 11.0 (TID 36)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File “/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/worker.py”, line 111, in main
process()
File “/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/worker.py”, line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File “/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/serializers.py”, line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File “/usr/hdp/2.5.0.0-1245/spark/python/pyspark/rdd.py”, line 795, in func
yield reduce(f, iterator, initial)
TypeError: () takes exactly 1 argument (2 given)

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

17/12/30 18:38:08 WARN TaskSetManager: Lost task 7.0 in stage 11.0 (TID 36, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File “/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/worker.py”, line 111, in main
process()
File “/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/worker.py”, line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File “/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/serializers.py”, line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File “/usr/hdp/2.5.0.0-1245/spark/python/pyspark/rdd.py”, line 795, in func
yield reduce(f, iterator, initial)
TypeError: () takes exactly 1 argument (2 given)

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

17/12/30 18:38:08 ERROR TaskSetManager: Task 7 in stage 11.0 failed 1 times; aborting job
17/12/30 18:38:08 INFO TaskSchedulerImpl: Removed TaskSet 11.0, whose tasks have all completed, from pool
17/12/30 18:38:08 INFO TaskSchedulerImpl: Cancelling stage 11
17/12/30 18:38:08 INFO DAGScheduler: ResultStage 11 (reduce at :1) failed in 0.088 s
17/12/30 18:38:08 INFO DAGScheduler: Job 11 failed: reduce at :1, took 0.093775 s
Traceback (most recent call last):
File “”, line 1, in
File “/usr/hdp/2.5.0.0-1245/spark/python/pyspark/rdd.py”, line 797, in reduce
vals = self.mapPartitions(func).collect()
File “/usr/hdp/2.5.0.0-1245/spark/python/pyspark/rdd.py”, line 771, in collect
port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File “/usr/hdp/2.5.0.0-1245/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py”, line 813, in call
File “/usr/hdp/2.5.0.0-1245/spark/python/pyspark/sql/utils.py”, line 45, in deco
return f(*a, **kw)
File “/usr/hdp/2.5.0.0-1245/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py”, line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 11.0 failed 1 times, most recent failure: Lost task 7.0 in stage 11.0 (TID 36, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File “/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/worker.py”, line 111, in main
process()
File “/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/worker.py”, line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File “/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/serializers.py”, line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File “/usr/hdp/2.5.0.0-1245/spark/python/pyspark/rdd.py”, line 795, in func
yield reduce(f, iterator, initial)
TypeError: () takes exactly 1 argument (2 given)

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:622)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1882)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1953)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:934)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
at org.apache.spark.rdd.RDD.collect(RDD.scala:933)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File “/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/worker.py”, line 111, in main
process()
File “/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/worker.py”, line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File “/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/serializers.py”, line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File “/usr/hdp/2.5.0.0-1245/spark/python/pyspark/rdd.py”, line 795, in func
yield reduce(f, iterator, initial)
TypeError: () takes exactly 1 argument (2 given)

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more

#2

Use this for reduce: rdd_temp_k2.reduce(lambda x, y:x+y)

>>> rdd_temp_k2.reduce(lambda x, y:x+y)
18/01/01 23:35:56 INFO SparkContext: Starting job: reduce at <stdin>:1
18/01/01 23:35:56 INFO DAGScheduler: Got job 1 (reduce at <stdin>:1) with 2 output partitions
18/01/01 23:35:56 INFO DAGScheduler: Final stage: ResultStage 1 (reduce at <stdin>:1)
18/01/01 23:35:56 INFO DAGScheduler: Parents of final stage: List()
18/01/01 23:35:56 INFO DAGScheduler: Missing parents: List()
18/01/01 23:35:56 INFO DAGScheduler: Submitting ResultStage 1 (PythonRDD[2] at reduce at <stdin>:1), which has no missing parents
18/01/01 23:35:56 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.0 KB, free 9.6 KB)
18/01/01 23:35:56 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.7 KB, free 12.3 KB)
18/01/01 23:35:56 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 172.16.1.100:37181 (size: 2.7 KB, free: 511.1 MB)
18/01/01 23:35:56 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1008
18/01/01 23:35:56 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (PythonRDD[2] at reduce at <stdin>:1)
18/01/01 23:35:56 INFO YarnScheduler: Adding task set 1.0 with 2 tasks
18/01/01 23:35:56 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, wn02.itversity.com, partition 0,PROCESS_LOCAL, 2093 bytes)
18/01/01 23:35:56 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, wn04.itversity.com, partition 1,PROCESS_LOCAL, 2112 bytes)
18/01/01 23:35:56 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on wn04.itversity.com:50016 (size: 2.7 KB, free: 511.1 MB)
18/01/01 23:35:56 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on wn02.itversity.com:46512 (size: 2.7 KB, free: 511.1 MB)
18/01/01 23:35:56 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 84 ms on wn04.itversity.com (1/2)
18/01/01 23:35:56 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 88 ms on wn02.itversity.com (2/2)
18/01/01 23:35:56 INFO YarnScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool 
18/01/01 23:35:56 INFO DAGScheduler: ResultStage 1 (reduce at <stdin>:1) finished in 0.088 s
18/01/01 23:35:56 INFO DAGScheduler: Job 1 finished: reduce at <stdin>:1, took 0.110000 s
2525.35