Spark and Kafka Dependency Issue Jar - Bigdata Labs (SprakStreaming kafka Integration CCA175)

cca-175
apache-spark
bigdatalabs

#1

Hi,

I am trying to run Spark submit commit command and I get the following error.

spark-submit \
  --class KafkaStreamingDepartmentCount \
  --master yarn \
  --conf spark.ui.port=15265 \
  --jars "/usr/hdp/2.5.0.0-1245/kafka/libs/spark-streaming-kafka_2.10-1.6.2.jar,/usr/hdp/2.5.0.0-1245/kafka/libs/kafka_2.10-0.8.2.1.jar,/usr/hdp/2.6.5.0-292/kafka/libs/metrics-core-2.2.0.jar" \
  sparkstreamdemo_2.11-0.1.jar yarn-client

Error: I tried to Look up in google it says mostly the version or dependency issue. When i perform sbt package the jar is generated fine but on running spark submit I get this error

I have below two sbt configurations:

  1. name := “SparkStreamDemo”

    version := “1.0”
    scalaVersion := “2.10.6”
    libraryDependencies += “org.apache.spark” % “spark-core_2.10” % “1.6.2”
    libraryDependencies += “org.apache.spark” % “spark-sql_2.10” % “1.6.2”
    libraryDependencies += “org.apache.spark” % “spark-hive_2.10” % “1.6.2”
    libraryDependencies += “org.apache.spark” % “spark-streaming_2.10” % “1.6.2”
    libraryDependencies += “org.apache.spark” % “spark-streaming-flume_2.10” % “1.6.2”
    libraryDependencies += “org.apache.spark” % “spark-streaming-flume-sink_2.10” % “1.6.2”
    libraryDependencies += “org.apache.spark” % “spark-streaming-kafka_2.10” % “1.6.2”

Second way:

name := "SparkStreamDemo"
version := "0.1"
scalaVersion := "2.11.12"
val sparkVersion = "2.3.0"
resolvers ++= Seq(
  "apache-snapshots" at "http://repository.apache.org/snapshots/"
)
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-mllib" % sparkVersion,
  "org.apache.spark" %% "spark-streaming" % sparkVersion,
  "org.apache.spark" %% "spark-hive" % sparkVersion,
  "org.apache.spark" % "spark-streaming-flume_2.11" % "2.3.2",
  "org.apache.spark" % "spark-streaming-flume-sink_2.11" % "2.3.2",
  "org.apache.commons" % "commons-lang3" % "3.2.1",
  "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.2",
  "org.scala-lang" % "scala-library" % "2.11.7",
  "mysql" % "mysql-connector-java" % "5.1.6"
)

KafkaStreamingDepartmentCount - Object Class

 import org.apache.spark.SparkConf
import org.apache.spark.streaming.{StreamingContext,Seconds}
import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder

object KafkaStreamingDepartmentCount {

  def main(args: Array[String])
  {
    val conf = new SparkConf().setAppName("Kafka Streaming Department Count").setMaster(args(0))
    val ssc = new StreamingContext(conf,Seconds(30))
    //val kafkaParams = Map[String, String]("metadata.broker.list" -> "wn01.itversity.com:6667,wn02.itversity.com:6667,wn03.itversity.com:6667,wn04.itversity.com:6667")
    val kafkaParams = Map[String, String]("metadata.broker.list" -> "nn01.itversity.com:6667,nn02.itversity.com:6667,rm01.itversity.com:6667")
    val topicSet = Set("fkdemojv")

    // Consume data from kafka topic. i.e create a stream that consumes Kafka topics.
    // The messages aas part of Streams are (key,Value) pairs in binary format and we have to convert it to string.
    // So we use the kafka serializer. We have to create it in below format.
    // val directKafkaStream = KafkaUtils.createDirectStream[ [key class], [value class], [key decoder class],
    // [value decoder class] ]( streamingContext, [map of Kafka parameters(kafka broker list or bootstrap.servers)], [set of topics to consume])

    val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)

    //Extract the data. Here the data received as a stream is of type tuple of K,V pair n we need to extract only value.
    val messages = stream.map(s => s._2)

    val departmentMessages = messages.
      filter(msg => {
        val endPoint = msg.split(" ")(6)
        endPoint.split("/")(1) == "department"
      })

    val departments = departmentMessages.
      map(ele => {
        val endPoint = ele.split(" ")(6)
        (endPoint.split("/")(2), 1)
      })

    val departmentTraffic = departments.
      reduceByKey((total,value) => total+value)

    departmentTraffic.saveAsTextFiles("/user/jvanchir/departmentwisetraffic/cnt")

    ssc.start()
    ssc.awaitTermination()

  }
}

Conf file Properties

# wskafka.conf: A single-node Flume configuration
# to read data from webserver logs and publish
# to kafka topic

# Name the components on this agent
wk.sources = ws
wk.sinks = kafka
wk.channels = mem

# Describe/configure the source
wk.sources.ws.type = exec
wk.sources.ws.command = tail -F /opt/gen_logs/logs/access.log

# Describe the sink
wk.sinks.kafka.type = org.apache.flume.sink.kafka.KafkaSink
wk.sinks.kafka.brokerList = wn01.itversity.com:6667,wn02.itversity.com:6667,wn03.itversity.com:6667,wn04.itversity.com:6667
wk.sinks.kafka.topic = fkdemojv

# Use a channel wkich buffers events in memory
wk.channels.mem.type = memory
wk.channels.mem.capacity = 1000
wk.channels.mem.transactionCapacity = 100

# Bind the source and sink to the channel
wk.sources.ws.channels = mem
wk.sinks.kafka.channel = mem

Could you please help me out, As I am unable to proceed further on this part of it.


BigData Labs: How can i check metastore Database in hive?
#2

@Jayvardhan_Reddy_Van You are pointing the wrong location for the JARS. Labs are upgraded already.

Use the below location for the JAR /usr/hdp/2.6.5.0-292/kafka/libs/spark-streaming-kafka_2.10-1.6.3.jar


#3

Hi Balu,

The jar is missing in my user directory as specified by you, Please find the attached screenshot for the same.
All the spark related dependencies are missing that are related to Flume, Kafka, Streaming in the specified directory, could you please add the same.

  "org.apache.spark" %% "spark-core" % sparkVersion,

“org.apache.spark” %% “spark-sql” % sparkVersion,
“org.apache.spark” %% “spark-mllib” % sparkVersion,
“org.apache.spark” %% “spark-streaming” % sparkVersion,
“org.apache.spark” %% “spark-hive” % sparkVersion,
“org.apache.spark” % “spark-streaming-flume_2.11” % “2.3.2”,
“org.apache.spark” % “spark-streaming-flume-sink_2.11” % “2.3.2”,
“org.apache.commons” % “commons-lang3” % “3.2.1”,
“org.apache.spark” % “spark-streaming-kafka_2.10” % “1.6.2”,


#4

Can you place your code in github repository and share with us? You are mixing up the versions, you have defined both Spark 2.3 as well as Spark 1.6 dependencies as part of your program.

It is not just about missing jars. Are you referring to any of my examples or running your own? I would recommend to follow my examples run successfully and then make changes depending upon your requirement.

@Sunil_Itversity or @BaLu_SaI - please keep track of issue and ping me when ever @Jayvardhan_Reddy_Van responds in this thread.


#5

Hi Durga/Balu/Sunil,

@dgadiraju @BaLu_SaI @Sunil_Itversity
I was referring to your playlist CCA175. vidoes: 142-144

As part of this Spark Streaming, I have tried with both dependency versions of Spark1.6 as well as Spark 2.3 individually.
You can find two sbt files for the same in the below link as well.

Please find the Gist Github link below.

Please do let me know, if you require any other information regarding the same.


#6

@Jayvardhan_Reddy_Van - it will be tough to check out the code from gist. I would recommend to create github repository and share it, so that we can check out and build jar file and run it.


#7

We have gone through your code. Your project is built using Scala 2.11.12 whereas you are using spark-streaming-kafka_2.10. It is resulting in incompatible issues.

You need to fix it, then compile the jar file and run it on the cluster.

name := "SparkStreamDemo"
version := "0.1"
scalaVersion := "2.11.12"
val sparkVersion = "2.3.0"
resolvers ++= Seq(
  "apache-snapshots" at "http://repository.apache.org/snapshots/"
)
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-mllib" % sparkVersion,
  "org.apache.spark" %% "spark-streaming" % sparkVersion,
  "org.apache.spark" %% "spark-hive" % sparkVersion,
  "org.apache.spark" % "spark-streaming-flume_2.11" % "2.3.2",
  "org.apache.spark" % "spark-streaming-flume-sink_2.11" % "2.3.2",
  "org.apache.commons" % "commons-lang3" % "3.2.1",
  "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.2",
  "org.scala-lang" % "scala-library" % "2.11.7",
  "mysql" % "mysql-connector-java" % "5.1.6"
)

#8

Thanks @dgadiraju @Sunil_Itversity @BaLu_SaI ,

Can I use the below streaming version to make it compatible. If yes, then should the below jar be present as part of the Big Data Labs Cluster as well for compilation and execution?

"org.apache.spark" %% “spark-streaming-kafka-0-10” % "2.3.1"

I used the below version earlier because this jar was present as part of the bigdata labs when i checked at the usr/hdp location for spark-streaming jar.

“org.apache.spark” % “spark-streaming-kafka_2.10” % “1.6.2”

On Executing below command what am i supposed to specify the location of the Jar for the Spark Job in cluster?

spark-submit
–class KafkaStreamingDepartmentCount
–master yarn
–conf spark.ui.port=15265
–jars "/usr/hdp/2.5.0.0-1245/kafka/libs/spark-streaming-kafka_2.10-1.6.2.jar,
/usr/hdp/2.5.0.0-1245/kafka/libs/kafka_2.10-0.8.2.1.jar,
/usr/hdp/2.6.5.0-292/kafka/libs/metrics-core-2.2.0.jar"
sparkstreamdemo_2.11-0.1.jar yarn-client


#9

You can give a try. If you run into any issues, please share as github project so that we can check out and test it and then get back to you.


#10

okay, Thanks @dgadiraju