Flume- Kafka integration


#1

Hi All,

I am facing an issue while integrating flume with kafka, as the topic is not being created in kafka via flume and thus data is also not being received in kafka.

to test flume i used multiple sinks (HDFS) and data is getting dumped in HDFS but not in kafka

Kindly help if you know the solution or faced similar problem

Regards
Burhanuddin Pithawala


#2

I am using this configuration file - fmp.conf


describe components

fmp.sources = logSource
fmp.sinks = loggerSink hdfsSink myKafkaSink
fmp.channels = loggerChannel hdfsChannel myKafkaChannel

configuring sources

fmp.sources.logSource.type = exec
fmp.sources.logSource.command = tail -F /opt/gen_logs/logs/access.log

#configure logger sink
fmp.sinks.loggerSink.type = logger

#configure hdfs sink
fmp.sinks.hdfsSink.type = hdfs
fmp.sinks.hdfsSink.hdfs.path = hdfs://nn01.itversity.com:8020/user/hadoop_lab_itversity/flume/kafka/data_%Y-%m-%d
fmp.sinks.hdfsSink.hdfs.rollSize = 1024
fmp.sinks.hdfsSink.hdfs.rollCount = 10
fmp.sinks.hdfsSink.hdfs.rollInterval = 30
fmp.sinks.hdfsSink.hdfs.fileType = DataStream
fmp.sinks.hdfsSink.hdfs.filePrefix = retail
fmp.sinks.hdfsSink.hdfs.fileSuffix = .txt
fmp.sinks.hdfsSink.hdfs.inUseSuffix - .tmp
fmp.sinks.hdfsSink.hdfs.useLocalTimeStamp = true

#configure kafka sink
fmp.sinks.myKafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
fmp.sinks.myKafkaSink.kafka.bootstrap.servers = nn01.itversity.com:6667,nn02.itversity.com:6667,rm01.itversity.com:6667
fmp.sinks.myKafkaSink.flumeBatchSize = 5
fmp.sinks.myKafkaSink.kafka.producer.acks = 0
fmp.sinks.myKafkaSink.kafka.topic = myKafka3

#configure logger channels
fmp.channels.loggerChannel.type = memory
fmp.channels.loggerChannel.capacity = 100
fmp.channels.loggerChannel.transactionCapacity = 100

#configure hdfs channels
fmp.channels.hdfsChannel.type = memory
fmp.channels.hdfsChannel.capacity = 1000
fmp.channels.hdfsChannel.transactionCapacity = 100

#configure kafka channel
fmp.channels.myKafkaChannel.type = memory
fmp.channels.myKafkaChannel.capacity = 10
fmp.channels.myKafkaChannel.transactionCapacity = 10

#binding source and sink to the channel
fmp.sources.logSource.channels = loggerChannel hdfsChannel
fmp.sinks.loggerSink.channel = loggerChannel
fmp.sinks.hdfsSink.channel = hdfsChannel
fmp.sinks.myKafkaSink.channel = myKafkaChannel