Spark Structured streaming integration with Kafka


#1

Hi All,

I wrote simple Spark Structured streaming program reading from Kafka source and writing back to Kafka. The source topic and sink topics are different. When submit the job using spark-submit, the program executes without any errors but not reading or consuming the messages from topic. It is simply waiting at “Starting new streaming query” and then nothing happening further.

I am publishing the messages into the source topic using kafka-console-publisher and able to see them using kafka-console-consumer. But the spark program seems to be doing nothing.

And my spark program is

package retail

import org.apache.spark.sql.{Encoders, SparkSession}

object SStreamFromKafka {

def main(args:Array[String]) : Unit = {

val spark = SparkSession.builder().appName("Spark Continuous Streaming Words").
  master("local").getOrCreate()

spark.sparkContext.setLogLevel("INFO")

import spark.implicits._

val stream = spark.readStream.format("kafka").
  option("kafka.bootstrap.servers", "nn01.itversity.com:6667,nn02.itversity.com:6667").
  option("subscribe", "kafkambnew").
  option("startingOffsets", "earliest").
  load()

val words = stream.selectExpr("CAST( value AS STRING)").as(Encoders.STRING).flatMap(_.split(" "))


val myquerystream = words.writeStream.format("kafka").
option("kafka.bootstrap.servers", "nn01.itversity.com:6667,nn02.itversity.com:6667")
  .option("topic", "kafkambnew1").
  option("checkpointLocation", "/user/output/SStreamFromKafka").
  start()

myquerystream.awaitTermination()

}

}

(Note: the checkpointLocation directory changed in this post.)

The log output of the program is

18/08/13 12:08:03 INFO Client: Submitting application application_1533622723243_3571 to ResourceManager
18/08/13 12:08:03 INFO YarnClientImpl: Submitted application application_1533622723243_3571
18/08/13 12:08:03 INFO SchedulerExtensionServices: Starting Yarn extension services with app application_1533622723243_3571 and attemptId None
18/08/13 12:08:04 INFO Client: Application report for application_1533622723243_3571 (state: ACCEPTED)
18/08/13 12:08:04 INFO Client:
client token: N/A
diagnostics: AM container is launched, waiting for AM container to Register with RM
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: default
start time: 1534176483370
final status: UNDEFINED
tracking URL: http://rm01.itversity.com:19288/proxy/application_1533622723243_3571/
user: madanbolla
18/08/13 12:08:05 INFO Client: Application report for application_1533622723243_3571 (state: ACCEPTED)
18/08/13 12:08:06 INFO Client: Application report for application_1533622723243_3571 (state: ACCEPTED)
18/08/13 12:08:07 INFO YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> rm01.itversity.com, PROXY_URI_BASES -> http://rm01.itversity.com:19288/proxy/application_1533622723243_3571), /proxy/application_1533622723243_3571
18/08/13 12:08:07 INFO JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
18/08/13 12:08:07 INFO YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef(spark-client://YarnAM)
18/08/13 12:08:07 INFO Client: Application report for application_1533622723243_3571 (state: RUNNING)
18/08/13 12:08:07 INFO Client:
client token: N/A
diagnostics: N/A
ApplicationMaster host: 172.16.1.103
ApplicationMaster RPC port: 0
queue: default
start time: 1534176483370
final status: UNDEFINED
tracking URL: http://rm01.itversity.com:19288/proxy/application_1533622723243_3571/
user: madanbolla
18/08/13 12:08:07 INFO YarnClientSchedulerBackend: Application application_1533622723243_3571 has started running.
18/08/13 12:08:07 INFO Utils: Successfully started service ‘org.apache.spark.network.netty.NettyBlockTransferService’ on port 51931.
18/08/13 12:08:07 INFO NettyBlockTransferService: Server created on gw02.itversity.com:51931
18/08/13 12:08:07 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
18/08/13 12:08:07 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, gw02.itversity.com, 51931, None)
18/08/13 12:08:07 INFO BlockManagerMasterEndpoint: Registering block manager gw02.itversity.com:51931 with 366.3 MB RAM, BlockManagerId(driver, gw02.itversity.com, 51931, None)
18/08/13 12:08:07 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, gw02.itversity.com, 51931, None)
18/08/13 12:08:07 INFO BlockManager: external shuffle service port = 7447
18/08/13 12:08:07 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, gw02.itversity.com, 51931, None)
18/08/13 12:08:08 INFO EventLoggingListener: Logging events to hdfs:/spark2-history/application_1533622723243_3571
18/08/13 12:08:08 INFO Utils: Using initial executors = 0, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances
18/08/13 12:08:08 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8
18/08/13 12:08:08 INFO SharedState: loading hive config file: file:/etc/spark2/2.6.5.0-292/0/hive-site.xml
18/08/13 12:08:08 INFO SharedState: Setting hive.metastore.warehouse.dir (‘null’) to the value of spark.sql.warehouse.dir (‘file:/home/madanbolla/spark-warehouse/’).
18/08/13 12:08:08 INFO SharedState: Warehouse path is ‘file:/home/madanbolla/spark-warehouse/’.
18/08/13 12:08:09 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
18/08/13 12:08:10 INFO ConsumerConfig: ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [nn01.itversity.com:6667, nn02.itversity.com:6667]
ssl.keystore.type = JKS
enable.auto.commit = false
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id =
ssl.endpoint.identification.algorithm = null
max.poll.records = 1
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id = spark-kafka-source-eca6d447-ce8f-44e2-870f-52f6902d30fe–1925076224-driver-0
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = earliest

18/08/13 12:08:10 INFO ConsumerConfig: ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [nn01.itversity.com:6667, nn02.itversity.com:6667]
ssl.keystore.type = JKS
enable.auto.commit = false
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id = consumer-1
ssl.endpoint.identification.algorithm = null
max.poll.records = 1
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id = spark-kafka-source-eca6d447-ce8f-44e2-870f-52f6902d30fe–1925076224-driver-0
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = earliest

18/08/13 12:08:10 INFO AppInfoParser: Kafka version : 0.10.0.1
18/08/13 12:08:10 INFO AppInfoParser: Kafka commitId : a7a17cdec9eaa6c5
18/08/13 12:08:11 INFO MicroBatchExecution: Starting [id = 154dd662-ff41-4f7a-bc9e-d7078ad0a2f2, runId = a80f8eef-8466-4f8c-bd6b-30007b8cb0ce]. Use hdfs://nn01.itversity.com:8020/user/madanbolla/output/SStreamFromKafka to store the query checkpoint.
18/08/13 12:08:11 INFO ConsumerConfig: ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [nn01.itversity.com:6667, nn02.itversity.com:6667]
ssl.keystore.type = JKS
enable.auto.commit = false
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id =
ssl.endpoint.identification.algorithm = null
max.poll.records = 1
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id = spark-kafka-source-2670b0b3-30d1-4f8f-92e0-06f7b5525fe2-1230437668-driver-0
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = earliest

18/08/13 12:08:11 INFO ConsumerConfig: ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [nn01.itversity.com:6667, nn02.itversity.com:6667]
ssl.keystore.type = JKS
enable.auto.commit = false
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id = consumer-2
ssl.endpoint.identification.algorithm = null
max.poll.records = 1
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id = spark-kafka-source-2670b0b3-30d1-4f8f-92e0-06f7b5525fe2-1230437668-driver-0
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = earliest

18/08/13 12:08:11 INFO AppInfoParser: Kafka version : 0.10.0.1
18/08/13 12:08:11 INFO AppInfoParser: Kafka commitId : a7a17cdec9eaa6c5
18/08/13 12:08:11 INFO MicroBatchExecution: Starting new streaming query.

That is the end of the log I am seeing on the console. After that nothing happens no matter how much time I wait.

Any help or suggestions would be appreciated.

Thanks
Madan


#2

I am not seeing any messages getting published into the sink topic.


#3

Hi Admin (Itversity labs),

Further to the problem mentioned above, I enabled the debug in my spark streaming application and found that group co-ordinator process is responding with an message number 15, i.e ConsumerCoordinatorNotAvailable.

Further investigation and search on this suggests that restarting of kafka/zookeeper servers solve the problem. I am not sure if it is a post-upgrade (spark 2.3.x) issue.

Please help us in resolving this issue.

Here is the log from the application.

18/08/14 13:17:13 DEBUG NetworkClient: Sending metadata request {topics=[kafkambnew]} to node 1003
18/08/14 13:17:13 DEBUG Metadata: Updated cluster metadata version 3 to Cluster(nodes = [rm01.itversity.com:6667 (id: 1003 rack: /default-rack), nn02.itversity.com:6667 (id: 1002 rack: /default-rack), nn01.itversity.com:6667 (id: 1001 rack: /default-rack)], partitions = [Partition(topic = kafkambnew, partition = 0, leader = 1002, replicas = [1002,1003,], isr = [1002,1003,], Partition(topic = kafkambnew, partition = 1, leader = 1003, replicas = [1003,1001,], isr = [1003,1001,]])
18/08/14 13:17:13 DEBUG AbstractCoordinator: Sending coordinator request for group kafkaDirectStream to broker rm01.itversity.com:6667 (id: 1003 rack: /default-rack)
18/08/14 13:17:13 DEBUG AbstractCoordinator: Received group coordinator response ClientResponse(receivedTimeMs=1534267033199, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@3bfbc265, request=RequestSend(header={api_key=10,api_version=0,correlation_id=3,client_id=consumer-1}, body={group_id=kafkaDirectStream}), createdTimeMs=1534267033193, sendTimeMs=1534267033193), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}})

Regards
Madan


#4

This problem was resolved after the kafka servers were restarted.

Thanks
Madan