Structured Streaming with Kafka


#1

Hi All,

Below are the lines of code that i ran on the spark-shell after creating it using the first command.
I am just intending to get the messages from a kafka topic and writing it out to HDFS location.

This is throwing out below error.

If anyone here could please help. It would be great.

export SPARK_MAJOR_VERSION=2

spark-shell \
--master yarn \
--num-executors 2 \
--executor-memory 512M \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0

val df = spark.readStream.format(“org.apache.spark.sql.kafka010.KafkaSourceProvider”).option(“kafka.bootstrap.servers”,“nn01.itversity.com:2181,nn02.itversity.com:2181,rm01.itversity.com:2181”).option(“subscribe”,“test_vihit”).load
df: org.apache.spark.sql.DataFrame = [key: binary, value: binary … 5 more fields]

scala> df.writeStream.format(“parquet”).option(“path”,“DataSet/KafkaTest”).option(“checkpointLocation”,“DataSet/KafkaTestCheckPoint”).start()
java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/streaming/Source$class
at org.apache.spark.sql.kafka010.KafkaSource.(KafkaSource.scala:92)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:152)
at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:240)
at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun$1.applyOrElse(StreamingQueryManager.scala:245)
at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun$1.applyOrElse(StreamingQueryManager.scala:241)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:278)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:268)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:241)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:287)
… 48 elided


#2

@dgadiraju
@koushikmln


#3

Class not found is typically result of not having appropriate jar files. You need to explore what all jar files are required and add while launching spark-shell.


#4

@dgadiraju Sir, are we planning to have the latest Spark versions on our cloud lab? This would help people get acquainted with the latest apis and features of Spark.

Thanks,
Vihit.


#5

@Sunil_Abhishek Can you please provide an update? Thanks!


#6

@Vihit_Shah

You can launch spark 2.X with the below command in our labs.

export SPARK_MAJOR_VERSION=2
spark-shell \
--master yarn \
--deploy-mode client \
--conf spark.ui.port=12335 \
--num-executors 2 \
--executor-memory 512M