Flume-kafka-spark integration


Hello Team,
I tried to integrate kafka and Spark by writing the files to both HDFS and Kafka sink. I was able to write to both however, i did not get the output in spark. i verified the code and everything looks good. can someone please point out to my mistake?
Also, how do we differenciate between header and body from the data that we receive from
tail -F /opt/gen_logs/logs/access.log?

Flume config File: /home/manjunath/kafkaflume.conf
python script: /home/manjunath/kafkaStreaming.py

spark-submit --master yarn
–conf spark.ui.port=12897
–jars “/usr/hdp/,/usr/hdp/,/usr/hdp/” kafkaStreaming.py


can you share the conf file and python script as well


Hi Sunil, I mentioned the path where conf and python files are stored. However, providing both of them for your reference
Config file

Name the components on this agent

fsk.sources = webogs
fsk.sinks = sk_hds kafka
fsk.channels = ch_hds ch_kfk
#fsk.channels = mem

Describe/configure the source

fsk.sources.webogs.type = exec
#fsk.sources.webogs.command = tail -F /opt/gen_logs/logs/access.log | nc -lk gw02.itversity.com 19801
fsk.sources.webogs.command = tail -F /opt/gen_logs/logs/access.log

Describe the sink

fsk.sinks.sk_hds.type = hdfs
#fs.sinks.sk_hds.hdfs.path = hdfs://nn01.itversity.com:8020/user/manjunath/flumeSpark_%Y-%m-%d
fsk.sinks.sk_hds.hdfs.path = /user/manjunath/KafkaSpark_%Y-%m-%d
fsk.sinks.sk_hds.hdfs.fileType = DataStream
fsk.sinks.sk_hds.hdfs.rollInterval = 60
fsk.sinks.sk_hds.hdfs.rollSize = 0
fsk.sinks.sk_hds.hdfs.rollCount = 0
fsk.sinks.sk_hds.hdfs.filePrefix = CMOR_Retail
fsk.sinks.sk_hds.hdfs.fileSuffix = .txt
fsk.sinks.sk_hds.hdfs.useLocalTimeStamp = true

#Spark sink configuration

fsk.sinks.kafka.type = org.apache.flume.sink.kafka.KafkaSink
fsk.sinks.kafka.brokerList = wn01.itversity.com:6667,wn02.itversity.com:6667,wn03.itversity.com:6667,wn04.itversity.com:6667
fsk.sinks.kafka.topic = spkkafkaIntegration

Use a channel which buffers events in memory

fsk.channels.ch_hds.type = memory
fsk.channels.ch_hds.capacity = 100
fsk.channels.ch_hds.transactionCapacity = 100

fsk.channels.ch_kfk.type = memory
fsk.channels.ch_kfk.capacity = 100
fsk.channels.ch_kfk.transactionCapacity = 100

Bind the source and sink to the channel

fsk.sources.webogs.channels = ch_hds ch_kfk
fsk.sinks.sk_hds.channel = ch_hds
fsk.sinks.kafka.channel = ch_kfk

Python Script:
from pyspark import SparkContext,SparkConf,StorageLevel
from pyspark.streaming import *
from pyspark.streaming.kafka import KafkaUtils

conf = SparkConf().setMaster(“yarn-client”).setAppName(“Kafka Streaming Count By Department”)
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc,30)

lines = KafkaUtils.createDirectStream(ssc,[“pkkafkaIntegration”],{“metadata.broker.list”:“wn01.itversity.com:6667,wn02.itversity.com:6667,wn03.itversity.com:6667,wn04.itversity.com:6667”})

#topic = [“pkkafkaIntegration”]
#brokers = {{“metadata.broker.list”:“wn01.itversity.com:6667,wn02.itversity.com:6667,wn03.itversity.com:6667,wn04.itversity.com:6667”}
#lines = KafkaUtils.createDirectStream(ssc, topic, brokers)

#lines = FlumeUtils.createPollingStream(ssc,[(“gw02.itversity.com”,19801)])

message = lines.map(lambda s : s[1])

deptData = message.filter(lambda s : s.split()[6].split(’/’)[1] ==‘department’)
deptTuple = deptData.map(lambda s : (s.split()[6].split(’/’)[2],1) )
#deptCount = deptTuple.reduceByKeyAndWindow(lambda x,y:x+y,10,30)
#deptPrint = deptCount.pprint()
deptCount = deptTuple.reduceByKey(lambda x,y: x+y)
deptFile = deptCount.saveAsTextFiles(’/user/manjunath/Integration_KAFKA_SPK’)


Have you already created the topic “pkkafkaIntegration” ?
can you check, if it exists?

kafka-topics.sh --list --zookeeper “nn01.itversity.com:2181,nn02.itversity.com:2181,rm01.itversity.com:2181” --topic pkkafkaIntegration


Kafka output is a tuple of header and body. You need to extract the body. For example, a sample message from Kafka broker is as follows:

(None, u’ - - [01/Jan/2019:13:46:56 -0800] “GET /department/fitness/products HTTP/1.1” 404 761 “-” “Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.153 Safari/537.36”’)

Header -None

Body - - - [01/Jan/2019:13:46:56 -0800] “GET /department/fitness/products HTTP/1.1” 404 761 “-” “Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.153 Safari/537.36”

tail -f, is a linux command to get the newest out from “access.log”