Task not serializable while using broadcast

scala

#1

Hi All,
on Daily revenue problem i am trying to execute below code as part of broadcast variable usage;

import scala.io.Source
val products = Source.fromFile("/data/retail_db/products/part-00000").getLines
val productsMap = products.map(product => (product.split(",")(0).toInt, product.split(",")(2))).toMap
val bv = sc.broadcast(productsMap)

now when did ;
val dailyRevenuePerProductName = dailyRevenuePerProductId.map(rec => {
((rec._1._1, -rec._2), rec._1._1 + “,” + rec._2 + “,” + bv.value.get(rec._1._2).get)
})

I am getting error ;

org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2079)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:331)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:330)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
at org.apache.spark.rdd.RDD.map(RDD.scala:330)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:54)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:61)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:63)
at $iwC$$iwC$$iwC$$iwC$$iwC.(:65)
at $iwC$$iwC$$iwC$$iwC.(:67)
at $iwC$$iwC$$iwC.(:69)
at $iwC$$iwC.(:71)
at $iwC.(:73)
at (:75)
at .(:79)
at .()
at .(:7)
at .()
at $print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:875)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:875)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: scala.io.BufferedSource$BufferedLineIterator
Serialization stack:
- object not serializable (class: scala.io.BufferedSource$BufferedLineIterator, value: empty iterator)
- field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: products, type: interface scala.collection.Iterator)
- object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC@263ac7)
- field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
- object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC@7e19d69b)
- field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
- object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC@600e8be)
- field (class: $iwC$$iwC$$iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
- object (class $iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC@4998c57a)
- field (class: $iwC$$iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC$$iwC)
- object (class $iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC@48b5669c)
- field (class: $iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC)
- object (class $iwC$$iwC$$iwC, $iwC$$iwC$$iwC@4b1412eb)
- field (class: $iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC)
- object (class $iwC$$iwC, $iwC$$iwC@5764c072)
- field (class: $iwC, name: $iw, type: class $iwC$$iwC)
- object (class $iwC, $iwC@3c2fdd04)
- field (class: $line29.$read, name: $iw, type: class $iwC)
- object (class $line29.$read, $line29.$read@6cca79e7)
- field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: $VAL82, type: class $line29.$read)
- object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC@2e607b8)
- field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: $outer, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
- object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC@6479e6f6)
- field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1, name: $outer, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
- object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1, )
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)

Please help


#2

Hi @sachin_K

Can you send the full code?


#3

Hi, I was looking at code and found below differences;

i was using

import scala.io.Source
val products1 = Source.fromFile("/data/retail_db/products/part-00000").getLines
val products = products1.toList
val productsMap = products.map(product => (product.split(",")(0).toInt, product.split(",")(2))).toMap
val bv = sc.broadcast(productsMap)
val dailyRevenuePerProductName = dailyRevenuePerProductId.map(rec => {((rec._1._1, bv.value.get(rec._1._2).get), rec._2)})

however
in video durga sir using ;

import scala.io.Source
val products = Source.fromFile("/data/retail_db/products/part-00000").getLines.toList
val productsMap = products.map(product => (product.split(",")(0).toInt, product.split(",")(2))).toMap
val bv = sc.broadcast(productsMap)
val dailyRevenuePerProductName = dailyRevenuePerProductId.map(rec => {((rec._1._1, bv.value.get(rec._1._2).get), rec._2)})

I am not sure why this line is the issue – >val products = products1.toList