Spark submit to read kafka topic Error

Hi,
I was following the flume+spark+kafka Integration video. I submitted in Big Data Lab.
I started the Flume agent as well as Spark submit to read from kafka. I got array index of bound exception error

spark.yarn.driver.memoryOverhead is set but does not apply in client mode.
Exception in thread “main” java.lang.ArrayIndexOutOfBoundsException: 0

Can you please let me know what could be reason.

Thanks

Are you getting data to Kafka topic?

You need to run kafka-consumer command to see data flowing to the topic.

Yes I am getting data to Kafka Topic.

Here is my Java Code.

 import kafka.serializer.StringDecoder
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.kafka._
 import org.apache.spark.SparkConf
 import org.apache.spark.streaming.dstream.InputDStream

object StreamingDepartmentAnalysis {
       def main(args: Array[String]) {
             val sparkConf = new SparkConf().
             setAppName("DepartmentWiseCount").setMaster("yarn-client")
             val topicsSet = "kafkadm".split(",").toSet
             val kafkaParams =
             Map[String, String]("metadata.broker.list" -> "nn02.itversity.com:6667")
                    val ssc = new StreamingContext(sparkConf, Seconds(60))
                    val messages: InputDStream[(String, String)] = KafkaUtils.
                    createDirectStream[String, String, StringDecoder, StringDecoder](
                    ssc, kafkaParams, topicsSet)

                    val lines = messages.map(_._2)
                    val linesFiltered = lines.filter(rec => rec.contains("GET /department/"))
                    val countByDepartment = linesFiltered.
                    map(rec => (rec.split(" ")(6).split("/")(2), 1)).
                    reduceByKey(_ + _)
                    //        reduceByKeyAndWindow((a:Int, b:Int) => (a + b), Seconds(300), Seconds(60))
                   //    countByDepartment.saveAsTextFiles(args(0))
                  // Below function call will save the data into HDFS
                 countByDepartment.saveAsTextFiles(args(0))
                 ssc.start()
                ssc.awaitTermination()
       }

}

Here is my config file.
For now I marked the kafka Topic for deletion , thinking to create a new one and test. Can the spark program read from deleted topic ?

Thanks

Hi,
I was having wrong spark command for which it was giving exception. I suppose to pass output file name as argument.

Can you show your exact spark command for this job?

This is solved. I was no passing the output file as an argument. When I passed as per below.
It solved.
spark-submit --class StreamingDepartmentAnalysis
–master yarn
–conf spark.ui.port=22231
–jars “/usr/hdp/2.5.0.0-1245/spark/lib/spark-streaming_2.10-1.6.2.jar,/usr/hdp/2.5.0.0-1245/kafka/libs/spark-streaming-kafka_2.10-1.6.2.jar,/usr/hdp/2.5.0.0-1245/kafka/libs/kafka_2.10-0.8.2.1.jar,/usr/hdp/2.5.0.0-1245/kafka/libs/metrics-core-2.2.0.jar”
flumekafkasparkwordcount_2.10-1.0.jar /user/digambarmishra/streaming/streamingdepartmentanalysis

I need help now.
How did you build your jar? Are you using SBT or maven?

Also how are you starting kafka/flume? I cannot find where Durga tells us how to start flume/kafka for this project.

1 Like

I am using SBT.

To start kafka ,
you need to to build kafka configueration file and write to Kafka Sink.
Start the flume agent to write the Logs to kafka Sink.

This is covered in flume+spark+kafka Integration video

1 Like

How did you build your jar? could you please let us know? or any video where it is covered

Please look this video

Look at the time 26.06 Minutes. The code is present in github as shown in that Video.

I got that code and used in scala . Using SBT package, I built jar file.

1 Like