gettingTask not serializable Exception while running Spark job

hive
apache-spark
hadoop
gettingTask not serializable Exception while running Spark job
5.0 1

#1

i am getting Task not serializable exception.

Here is my sample code and error trace:

object th extends Serializable
{
** def main(args: Array[String]): Unit =**
** {**



** val conf = new SparkConf().setAppName(“th”).setMaster(“local”)**
** conf.set(“spark.debug.maxToStringFields”, “10000000”)**
** val context = new SparkContext(conf)**
** val sqlCotext = new SQLContext(context)**
** val hiveContext = new HiveContext(context)**
** import hiveContext.implicits._ **
** val list = hiveContext.sql(“select application_number from ipg_interested_parties”).collect().take(100) **
** val l1=context.parallelize(list) **
** val stu1 =StructType(**
** StructField(“application_number”, LongType, true) ::**
** StructField(“event_code”, StringType, true) ::**
** StructField(“event_description”, StringType, true) ::**
** StructField(“event_recorded_date”, StringType, true) :: Nil)**
** var initialDF1 = sqlCotext.createDataFrame(context.emptyRDD[Row], stu1)**
** l1.repartition(10).foreachPartition(f=>{f.foreach(f=>**
** {**
** val schema=StructType(List(**
** StructField(“queryResults”,StructType(**
** List(StructField(“searchResponse”,StructType(**
** List(StructField(“response”,StructType(**
** List(StructField(“docs”,ArrayType(StructType(**
** List( **
** StructField(“transactions”,ArrayType(StructType(**
** List**
** (**
** StructField(“code”, StringType, nullable = true),**
** StructField(“description”, StringType, nullable = true),**
** StructField(“recordDate”, StringType, nullable = true)**
** )**
** ))) **
** )**
** ))))**
** )))**
** )))**
** ))**
** ))**


** val z = f.toString().replace("[","").replace("]","").replace(" “,”").replace("(","").replace(")","")**
** if(z!= null)**
** {**
** val cmd = Seq(“curl”, “-X”, “POST”, “–insecure”, “–header”, “Content-Type: application/json”, “–header”, “Accept: application/json”, “-d”, “{“searchText”:”"+z+"",“qf”:“applId”}", “https://ped.uspto.gov/api/queries”) //cmd.!**
** val r = cmd.!!**
** val r1 = r.toString()**
** val rdd = context.parallelize(Seq(r1))**
** val dff = sqlCotext.read.schema(schema).json(rdd.toDS) **
** val dfContent = dff.select(explode(dff(“queryResults.searchResponse.response.docs.transactions”))).toDF(“transaction”)**
** val a1 = dfContent.select(“transaction.code”).collect()**
** val a2 = dfContent.select(“transaction.description”).collect()**
** val a3 = dfContent.select(“transaction.recordDate”).collect() **
** for (mmm1 <- a1; mm2 <- a2; mm3 <- a3) **
** {**
** val ress1 = mmm1.toString().replace("[", " “).replace(”]", " “).replace(“WrappedArray(”,”").replace(")","")**
** val res2 = mm2.toString().replace("[", " “).replace(”]", " “).replace(“WrappedArray(”,”").replace(")","")**
** val res3 = mm3.toString().replace("[", " “).replace(”]", " “).replace(“WrappedArray(”,”").replace(")","") **
** initialDF1 = initialDF1.union(Seq((z, ress1, res2, res3)).toDF(“application_number”, “event_code”, “event_description”, “event_recorded_date”))**
** }**
** } **


** })})**
** initialDF1.registerTempTable(“curlTH”)**
** hiveContext.sql(“insert into table default.ipg_transaction_history select application_number,event_code,event_description,event_recorded_date from curlTH”)**
** }**
}

Here is my error trace:

Exception in thread “main” org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:923)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:923)
at newipg170103.th$.main(th.scala:58)
at newipg170103.th.main(th.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@1e592ef2)
- field (class: newipg170103.th$$anonfun$main$1, name: context$1, type: class org.apache.spark.SparkContext)
- object (class newipg170103.th$$anonfun$main$1, )
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
… 20 more

Learn Spark 1.6.x or Spark 2.x on our state of the art big data labs

  • Click here for access to state of the art 13 node Hadoop and Spark Cluster