Kafka integration with Spark


#1

Hi all,
I am getting an ERROR and I am stuck. can any body resolve it for me.


// Flume configuration

#defining source, sink and channel
a2.sources = wl
a2.sinks = kafka
a2.channels = mem

#Describing the source to read data from.
a2.sources.wl.type = exec
a2.sources.wl.command = tail -F /opt/gen_logs/logs/access.log

#Describing Kafka sink
a2.sinks.kafka.type = org.apache.flume.sink.kafka.KafkaSink
a2.sinks.kafka.brokerList = quickstart.cloudera:9092
a2.sinks.kafka.topic = flume2kafka

#Defining a channel to buffers events in memory
a2.channels.mem.type = memory
a2.channels.mem.capacity = 1000
a2.channels.mem.transactionCapacity = 100

#Binding the source and the sink to the channel
a2.sources.wl.channels = mem
a2.sinks.kafka.channel = mem
++++++++++++++++++++++++++++++++++++++++

//Developing Spark Streaming with Kafka Integration

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder

object kafkaStreamingDeptCount
{
def main(args: Array[String]) {
val conf = new SparkConf().setAppName(“Flume+Kafka+Spark Integrated Streaming”).setMaster(args(0))
val ssc = new StreamingContext(conf, Seconds(30))

val kafkaParams = Map[String, String](“metadata.broker.list” -> “quickstart.cloudera:9092”)
val topicSet = Set(“flume2kafka”)

val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
val messages = directKafkaStream.map(s => s._2)
val filterMessage = messages.filter(rec => {
val endpoint = rec.split(" “)(6)
endpoint.split(”/")(2) == “department”
})

val mapMessages = filterMessage.map(msg => {
val msgEndPoint = msg.split(" “)(6)
(msgEndPoint.split(”/")(2), 1)
})

val results = mapMessages.reduceByKey(+)

results.saveAsTextFiles("/user/hdfs/StreamApp/kfk")

ssc.start()
ssc.awaitTermination()
}
}

//Deploying application on cluster

spark-submit
–class kafkaStreamingDeptCount
–master local[2]
–conf spark.ui.port=12456
–jars “/usr/lib/kafka/libs/kafka_2.11-0.10.2-kafka-2.2.0.jar,/usr/lib/kafka/libs/kafka-streams-0.10.2-kafka-2.2.0.jar,/usr/lib/kafka/libs/metrics-core-2.2.0.jar”
retail_2.10-1.0.jar local quickstart.cloudera 9092

// program worked and have written files but having no data in it

[cloudera@quickstart ~]$ hdfs dfs -ls /user/hdfs/StreamApp/
Found 15 items

drwxr-xr-x - cloudera supergroup 0 2018-05-03 00:11 /user/hdfs/StreamApp/kfk-1525286490000
drwxr-xr-x - cloudera supergroup 0 2018-05-03 00:12 /user/hdfs/StreamApp/kfk-1525286520000
drwxr-xr-x - cloudera supergroup 0 2018-05-03 00:12 /user/hdfs/StreamApp/kfk-1525286550000
[cloudera@quickstart ~]$ hdfs dfs -ls /user/hdfs/StreamApp/kfk-1525286550000
Found 1 items
drwxr-xr-x - cloudera supergroup 0 2018-05-03 00:12 /user/hdfs/StreamApp/kfk-1525286550000/_temporary
[cloudera@quickstart ~]$ hdfs dfs -ls /user/hdfs/StreamApp/kfk-1525286550000/_temporary
Found 1 items
drwxr-xr-x - cloudera supergroup 0 2018-05-03 00:12 /user/hdfs/StreamApp/kfk-1525286550000/_temporary/0
[cloudera@quickstart ~]$ hdfs dfs -ls /user/hdfs/StreamApp/kfk-1525286550000/_temporary/0
[cloudera@quickstart ~]$

//Getting this Error and failing:
8/05/03 00:12:30 INFO scheduler.TaskSchedulerImpl: Adding task set 4.0 with 1 tasks
18/05/03 00:12:30 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 4.0 (TID 4, localhost, executor driver, partition 0, ANY, 2299 bytes)
18/05/03 00:12:30 INFO executor.Executor: Running task 0.0 in stage 4.0 (TID 4)
18/05/03 00:12:30 INFO kafka.KafkaRDD: Computing topic flume2kafka, partition 0 offsets 135 -> 163
18/05/03 00:12:30 INFO utils.VerifiableProperties: Verifying properties
18/05/03 00:12:30 INFO utils.VerifiableProperties: Property group.id is overridden to
18/05/03 00:12:30 INFO utils.VerifiableProperties: Property zookeeper.connect is overridden to
18/05/03 00:12:30 ERROR executor.Executor: Exception in task 0.0 in stage 4.0 (TID 4)
java.lang.ArrayIndexOutOfBoundsException: 2
at kafkaStreamingDeptCount$$anonfun$3.apply(kafkaStreamingDeptCount.scala:18)
at kafkaStreamingDeptCount$$anonfun$3.apply(kafkaStreamingDeptCount.scala:16)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
18/05/03 00:12:30 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 4.0 (TID 4, localhost, executor driver): java.lang.ArrayIndexOutOfBoundsException: 2
at kafkaStreamingDeptCount$$anonfun$3.apply(kafkaStreamingDeptCount.scala:18)
at kafkaStreamingDeptCount$$anonfun$3.apply(kafkaStreamingDeptCount.scala:16)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

18/05/03 00:12:30 ERROR scheduler.TaskSetManager: Task 0 in stage 4.0 failed 1 times; aborting job
18/05/03 00:12:30 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool
18/05/03 00:12:30 INFO scheduler.TaskSchedulerImpl: Cancelling stage 4
18/05/03 00:12:30 INFO scheduler.DAGScheduler: ShuffleMapStage 4 (map at kafkaStreamingDeptCount.scala:21) failed in 0.327 s due to Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 4, localhost, executor driver): java.lang.ArrayIndexOutOfBoundsException: 2
at kafkaStreamingDeptCount$$anonfun$3.apply(kafkaStreamingDeptCount.scala:18)
at kafkaStreamingDeptCount$$anonfun$3.apply(kafkaStreamingDeptCount.scala:16)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
18/05/03 00:12:30 INFO scheduler.DAGScheduler: Job 2 failed: saveAsTextFiles at kafkaStreamingDeptCount.scala:28, took 0.363702 s
18/05/03 00:12:30 INFO scheduler.JobScheduler: Finished job streaming job 1525286550000 ms.0 from job set of time 1525286550000 ms
18/05/03 00:12:30 INFO scheduler.JobScheduler: Total delay: 0.635 s for time 1525286550000 ms (execution: 0.564 s)
18/05/03 00:12:30 INFO rdd.ShuffledRDD: Removing RDD 10 from persistence list
18/05/03 00:12:30 INFO storage.BlockManager: Removing RDD 10
18/05/03 00:12:30 INFO rdd.MapPartitionsRDD: Removing RDD 9 from persistence list
18/05/03 00:12:30 INFO storage.BlockManager: Removing RDD 9
18/05/03 00:12:30 INFO rdd.MapPartitionsRDD: Removing RDD 8 from persistence list
18/05/03 00:12:30 INFO storage.BlockManager: Removing RDD 8
18/05/03 00:12:30 INFO rdd.MapPartitionsRDD: Removing RDD 7 from persistence list
18/05/03 00:12:30 INFO storage.BlockManager: Removing RDD 7
18/05/03 00:12:30 INFO kafka.KafkaRDD: Removing RDD 6 from persistence list
18/05/03 00:12:30 INFO storage.BlockManager: Removing RDD 6
18/05/03 00:12:30 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer()
18/05/03 00:12:30 INFO scheduler.InputInfoTracker: remove old batch metadata: 1525286490000 ms
18/05/03 00:12:30 ERROR scheduler.JobScheduler: Error running job streaming job 1525286550000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 4, localhost, executor driver): java.lang.ArrayIndexOutOfBoundsException: 2

I am using cloudera quickstart VM 5.12.x version

Thanks in advance for you contribution :blush: