Connecting kafka from pyspark


#1

Hi,

I am writing a PySpark consumer application where I am trying to pull messages from a topic in Kafka.
scala version - 2.11.8
spark version - 2.2.1
–jar spark-streaming-kafka-0-8-assembly_2.11-2.2.1.jar

When passing the below parameters in KafkaUtils.createDirectStream
"“metadata.broker.list”: “brokerServer:9200”
“security.protocol”: “sasl_ssl”,
“sasl.mechanisms”: “PLAIN”,
“sasl.username”: username
“sasl.password”: password

I get the below error
“py4j.protocol.Py4JJavaError: An error occurred while calling o26.createDirectStreamWithoutMessageHandler.
: org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.”

but I could read the messages from Kafka through python code using Confluent’s Python Client for Apache Kafka.

What am I missing in PySpark ? why am I failing to connect through PySpark code?
Any update? Please help

Thanks
Ananthu


#2

Hello Ananth,

Why have you configured security protocol as ssl with username and password? Where are you running this code? If you are running it in local, you dont need to pass all these parameters while creating the dstream.


#3

Hi Sunil,

The Kafka cluster is not running in local. This cluster is available on cloud in company’s network and hence sasl.

is any solution possible?

Thanks and Regards,
Ananthu