Flume and kafka - Failed to fetch metadata

Hi All,

I am running below commad:
flume-ng agent --name fmp -c /home/rajeshv28/wlabssa -f flka.conf

And getting below error and do you have any idea why it is failing.

ERROR async.DefaultEventHandler: Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: default-flume-topic

17/05/15 23:58:34 ERROR async.DefaultEventHandler: Failed to send requests for topics default-flume-topic with correlation ids in [25,32]
17/05/15 23:58:34 ERROR kafka.KafkaSink: Failed to publish events
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:96)
at kafka.producer.Producer.send(Producer.scala:93)
at kafka.javaapi.producer.Producer.send(Producer.scala:44)
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:129)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
17/05/15 23:58:34 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to publish events
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:150)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Caused by: kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:96)
at kafka.producer.Producer.send(Producer.scala:93)
at kafka.javaapi.producer.Producer.send(Producer.scala:44)
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:129)

Here is my flka.conf file:

Name the components on this agent

fmp.sources = logsource
fmp.sinks = kafkasink hdfssink
fmp.channels = kafkachannel hdfschannel

Describe/configure the source

fmp.sources.logsource.type = exec
fmp.sources.logsource.command = tail -F /opt/gen_logs/logs/access.log

Describe the kafka sink

fmp.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink
fmp.sinks.kafkasink.brokerList = nn02.itversity.com:6667
#fmp.sinks.kafkasink.topic = kafkadg

Describe the HDFS sink

fmp.sinks.hdfssink.type = hdfs
fmp.sinks.hdfssink.hdfs.path = hdfs://nn01.itversity.com:8020/user/rajeshv28/wlabssa/flume1_example_%Y-%m-%d
fmp.sinks.hdfssink.hdfs.fileType = DataStream
fmp.sinks.hdfssink.hdfs.rollInterval = 120
fmp.sinks.hdfssink.hdfs.rollSize = 10485760
fmp.sinks.hdfssink.hdfs.rollCount = 100
fmp.sinks.hdfssink.hdfs.filePrefix = retail
fmp.sinks.hdfssink.hdfs.fileSuffix = .txt
fmp.sinks.hdfssink.hdfs.inUseSuffix = .tmp
fmp.sinks.hdfssink.hdfs.useLocalTimeStamp = true

Use a channel which buffers events in memory for kafkasink

fmp.channels.kafkachannel.type = memory
fmp.channels.kafkachannel.capacity = 1000
fmp.channels.kafkachannel.transactionCapacity = 100

Use a channel which buffers events in file for hdfssink

fmp.channels.hdfschannel.type = file
fmp.channels.hdfschannel.capacity = 1000
fmp.channels.hdfschannel.transactionCapacity = 100

Bind the source and sink to the channel

fmp.sources.logsource.channels = hdfschannel kafkachannel
fmp.sinks.kafkasink.channel = kafkachannel
fmp.sinks.hdfssink.channel = hdfschannel

Did you find the answer for this?

No.

Itversity,

Can you please help me on this?

Thanks,
Raj

ITVERSITY,

Could you please help me on this issue?

Thanks,
Raj

Hi Rajesh,

Run bellow command to make sure the Zookeeper and Kafka Broker running on port which you have mentioned in your command:

netstat -a | grep port

Please let me know if you still face any issue.

Hi Vinod,

Thank you for your reply.

I am using ITVersity labs and could not able to run the command as you suggested as I have permission issue.

[rajeshv28@gw01 ~]$ netstat -a | grep port
/proc/net/tcp: Permission denied

Thanks,
Rajesh

Try now and let us know if there is any issue.
If the issue persists first change the topic name and retry. If the new one is working, then make sure to delete old topic.