BigData labs - Pending case since last 10 days: Spark and Kafka Dependency Issue Jar - Bigdata Labs (SprakStreaming kafka Integration CCA175)

cca-175
apache-spark
bigdatalabs

#1

Hi,

I am trying to run Spark submit commit command and I get the following error.

spark-submit \

–class KafkaStreamingDepartmentCount
–master yarn
–conf spark.ui.port=15265
–jars “/usr/hdp/2.5.0.0-1245/kafka/libs/spark-streaming-kafka_2.10-1.6.2.jar,/usr/hdp/2.5.0.0-1245/kafka/libs/kafka_2.10-0.8.2.1.jar,/usr/hdp/2.6.5.0-292/kafka/libs/metrics-core-2.2.0.jar”
sparkstreamdemo_2.11-0.1.jar yarn-client

Error: I tried to Look up in google it says mostly the version or dependency issue. When i perform sbt package the jar is generated fine but on running spark submit I get this error

I have below two sbt configurations:

  1. `name := “SparkStreamDemo”

version := “1.0”
scalaVersion := “2.10.6”
libraryDependencies += “org.apache.spark” % “spark-core_2.10” % “1.6.2”
libraryDependencies += “org.apache.spark” % “spark-sql_2.10” % “1.6.2”
libraryDependencies += “org.apache.spark” % “spark-hive_2.10” % “1.6.2”
libraryDependencies += “org.apache.spark” % “spark-streaming_2.10” % “1.6.2”
libraryDependencies += “org.apache.spark” % “spark-streaming-flume_2.10” % “1.6.2”
libraryDependencies += “org.apache.spark” % “spark-streaming-flume-sink_2.10” % “1.6.2”
libraryDependencies += “org.apache.spark” % “spark-streaming-kafka_2.10” % “1.6.2”
`

Second way:

name := "SparkStreamDemo"

version := “0.1”
scalaVersion := “2.11.12”
val sparkVersion = “2.3.0”
resolvers ++= Seq(
“apache-snapshots” at “http://repository.apache.org/snapshots/
)
libraryDependencies ++= Seq(
“org.apache.spark” %% “spark-core” % sparkVersion,
“org.apache.spark” %% “spark-sql” % sparkVersion,
“org.apache.spark” %% “spark-mllib” % sparkVersion,
“org.apache.spark” %% “spark-streaming” % sparkVersion,
“org.apache.spark” %% “spark-hive” % sparkVersion,
“org.apache.spark” % “spark-streaming-flume_2.11” % “2.3.2”,
“org.apache.spark” % “spark-streaming-flume-sink_2.11” % “2.3.2”,
“org.apache.commons” % “commons-lang3” % “3.2.1”,
“org.apache.spark” % “spark-streaming-kafka_2.10” % “1.6.2”,
“org.scala-lang” % “scala-library” % “2.11.7”,
“mysql” % “mysql-connector-java” % “5.1.6”
)

KafkaStreamingDepartmentCount - Object Class

import org.apache.spark.SparkConf

import org.apache.spark.streaming.{StreamingContext,Seconds}
import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder

object KafkaStreamingDepartmentCount {

def main(args: Array[String])
{
val conf = new SparkConf().setAppName(“Kafka Streaming Department Count”).setMaster(args(0))
val ssc = new StreamingContext(conf,Seconds(30))
//val kafkaParams = Map[String, String](“metadata.broker.list” -> “wn01.itversity.com:6667,wn02.itversity.com:6667,wn03.itversity.com:6667,wn04.itversity.com:6667”)
val kafkaParams = Map[String, String](“metadata.broker.list” -> “nn01.itversity.com:6667,nn02.itversity.com:6667,rm01.itversity.com:6667”)
val topicSet = Set(“fkdemojv”)

// Consume data from kafka topic. i.e create a stream that consumes Kafka topics.
// The messages aas part of Streams are (key,Value) pairs in binary format and we have to convert it to string.
// So we use the kafka serializer. We have to create it in below format.
// val directKafkaStream = KafkaUtils.createDirectStream[ [key class], [value class], [key decoder class],
// [value decoder class] ]( streamingContext, [map of Kafka parameters(kafka broker list or bootstrap.servers)], [set of topics to consume])

val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)

//Extract the data. Here the data received as a stream is of type tuple of K,V pair n we need to extract only value.
val messages = stream.map(s => s._2)

val departmentMessages = messages.
  filter(msg => {
    val endPoint = msg.split(" ")(6)
    endPoint.split("/")(1) == "department"
  })

val departments = departmentMessages.
  map(ele => {
    val endPoint = ele.split(" ")(6)
    (endPoint.split("/")(2), 1)
  })

val departmentTraffic = departments.
  reduceByKey((total,value) => total+value)

departmentTraffic.saveAsTextFiles("/user/jvanchir/departmentwisetraffic/cnt")

ssc.start()
ssc.awaitTermination()

}
}

Conf file Properties

# wskafka.conf: A single-node Flume configuration

to read data from webserver logs and publish

to kafka topic

Name the components on this agent

wk.sources = ws
wk.sinks = kafka
wk.channels = mem

Describe/configure the source

wk.sources.ws.type = exec
wk.sources.ws.command = tail -F /opt/gen_logs/logs/access.log

Describe the sink

wk.sinks.kafka.type = org.apache.flume.sink.kafka.KafkaSink
wk.sinks.kafka.brokerList = wn01.itversity.com:6667,wn02.itversity.com:6667,wn03.itversity.com:6667,wn04.itversity.com:6667
wk.sinks.kafka.topic = fkdemojv

Use a channel wkich buffers events in memory

wk.channels.mem.type = memory
wk.channels.mem.capacity = 1000
wk.channels.mem.transactionCapacity = 100

Bind the source and sink to the channel

wk.sources.ws.channels = mem
wk.sinks.kafka.channel = mem

Could you please help me out, As I am unable to proceed further on this part of it.