Flume-kafka error


#1

trying to write to kafka topic using flume

conf file:

Name the components on this agent

wk.sources = ws
wk.sinks = kafka
wk.channels = mem

describe the Sources

wk.sources.ws.type = exec
wk.sources.ws.command = tail -F /home/kirantadisetti/gen_logs/logs/access.log

describe the sink

wk.sinks.kafka.type = org.apache.flume.sink.kafka.kafkaSink
wk.sinks.kafka.brokerList = wn01.itversity.com:6667,wn02.itversity.com:6667,wn03.itversity.com:6667,wn04.itversity.com:6667
wk.sinks.kafka.topic = mytopic_flu

describe the channel

wk.channels.mem.type = memory
wk.channels.mem.capaciy = 1000
wk.channels.mem.transactionCapacity = 100

bind the sources and sinks to the channel

wk.sources.ws.channels = mem
wk.sinks.kafka.channels = mem

------log as below

flume-ng agent --name wk --conf-file /home/kirantadisetti/flume_kafka.conf

error: while starting the agent

2018-11-20 02:53:35,547 main ERROR RollingFileManager (/tmp/flume/flume.log) java.io.FileNotFoundException: /tmp/flume/flume.log (Permission denied) java.io.FileNotFoundException: /tmp/flume/flume.log (Permission denied)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(FileOutputStream.java:270)
at java.io.FileOutputStream.(FileOutputStream.java:213)
at java.io.FileOutputStream.(FileOutputStream.java:133)
at org.apache.logging.log4j.core.appender.rolling.RollingFileManager$RollingFileManagerFactory.createManager(RollingFileManager.java:640)
at org.apache.logging.log4j.core.appender.rolling.RollingFileManager$RollingFileManagerFactory.createManager(RollingFileManager.java:608)
at org.apache.logging.log4j.core.appender.AbstractManager.getManager(AbstractManager.java:113)
at org.apache.logging.log4j.core.appender.OutputStreamManager.getManager(OutputStreamManager.java:115)
at org.apache.logging.log4j.core.appender.rolling.RollingFileManager.getFileManager(RollingFileManager.java:188)
at org.apache.logging.log4j.core.appender.RollingFileAppender$Builder.build(RollingFileAppender.java:144)
at org.apache.logging.log4j.core.appender.RollingFileAppender$Builder.build(RollingFileAppender.java:60)
at org.apache.logging.log4j.core.config.plugins.util.PluginBuilder.build(PluginBuilder.java:122)
at org.apache.logging.log4j.core.config.AbstractConfiguration.createPluginObject(AbstractConfiguration.java:958)
at org.apache.logging.log4j.core.config.AbstractConfiguration.createConfiguration(AbstractConfiguration.java:898)
at org.apache.logging.log4j.core.config.AbstractConfiguration.createConfiguration(AbstractConfiguration.java:890)
at org.apache.logging.log4j.core.config.AbstractConfiguration.doConfigure(AbstractConfiguration.java:513)
at org.apache.logging.log4j.core.config.AbstractConfiguration.initialize(AbstractConfiguration.java:237)
at org.apache.logging.log4j.core.config.AbstractConfiguration.start(AbstractConfiguration.java:249)
at org.apache.logging.log4j.core.LoggerContext.setConfiguration(LoggerContext.java:545)
at org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:617)
at org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:634)
at org.apache.logging.log4j.core.LoggerContext.start(LoggerContext.java:229)
at org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:153)
at org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:45)
at org.apache.logging.log4j.LogManager.getContext(LogManager.java:194)
at org.apache.logging.log4j.spi.AbstractLoggerAdapter.getContext(AbstractLoggerAdapter.java:122)
at org.apache.logging.slf4j.Log4jLoggerFactory.getContext(Log4jLoggerFactory.java:43)
at org.apache.logging.log4j.spi.AbstractLoggerAdapter.getLogger(AbstractLoggerAdapter.java:46)
at org.apache.logging.slf4j.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:29)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:242)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:254)
at org.apache.flume.node.Application.(Application.java:59)

2018-11-20 02:53:35,551 main ERROR Could not create plugin of type class org.apache.logging.log4j.core.appender.RollingFileAppender for element RollingFile: java.lang.IllegalStateException: ManagerFactory [org.apache.logging.log4j.core.appender.rolling.RollingFileManager$RollingFileManagerFactory@87a85e1] unable to create manager for [/tmp/flume/flume.log] with data [org.apache.logging.log4j.core.appender.rolling.RollingFileManager$FactoryData@671a5887[pattern=/tmp/flume/archive/flume.log.%d{yyyyMMdd}-%i, append=true, bufferedIO=true, bufferSize=8192, policy=CompositeTriggeringPolicy(policies=[SizeBasedTriggeringPolicy(size=104857600), CronTriggeringPolicy(schedule=0 0 0 * * ?)]), strategy=DefaultRolloverStrategy(min=1, max=20, useMax=true), advertiseURI=null, layout=%d{dd MMM yyyy HH:mm:ss,SSS} %-5p [%t] (%C.%M:%L) %equals{%x}{[]}{} - %m%n, filePermissions=null, fileOwner=null]] java.lang.IllegalStateException: ManagerFactory [org.apache.logging.log4j.core.appender.rolling.RollingFileManager$RollingFileManagerFactory@87a85e1] unable to create manager for [/tmp/flume/flume.log] with data [org.apache.logging.log4j.core.appender.rolling.RollingFileManager$FactoryData@671a5887[pattern=/tmp/flume/archive/flume.log.%d{yyyyMMdd}-%i, append=true, bufferedIO=true, bufferSize=8192, policy=CompositeTriggeringPolicy(policies=[SizeBasedTriggeringPolicy(size=104857600), CronTriggeringPolicy(schedule=0 0 0 * * ?)]), strategy=DefaultRolloverStrategy(min=1, max=20, useMax=true), advertiseURI=null, layout=%d{dd MMM yyyy HH:mm:ss,SSS} %-5p [%t] (%C.%M:%L) %equals{%x}{[]}{} - %m%n, filePermissions=null, fileOwner=null]]
at org.apache.logging.log4j.core.appender.AbstractManager.getManager(AbstractManager.java:115)
at org.apache.logging.log4j.core.appender.OutputStreamManager.getManager(OutputStreamManager.java:115)
at org.apache.logging.log4j.core.appender.rolling.RollingFileManager.getFileManager(RollingFileManager.java:188)
at org.apache.logging.log4j.core.appender.RollingFileAppender$Builder.build(RollingFileAppender.java:144)
at org.apache.logging.log4j.core.appender.RollingFileAppender$Builder.build(RollingFileAppender.java:60)
at org.apache.logging.log4j.core.config.plugins.util.PluginBuilder.build(PluginBuilder.java:122)
at org.apache.logging.log4j.core.config.AbstractConfiguration.createPluginObject(AbstractConfiguration.java:958)
at org.apache.logging.log4j.core.config.AbstractConfiguration.createConfiguration(AbstractConfiguration.java:898)
at org.apache.logging.log4j.core.config.AbstractConfiguration.createConfiguration(AbstractConfiguration.java:890)
at org.apache.logging.log4j.core.config.AbstractConfiguration.doConfigure(AbstractConfiguration.java:513)
at org.apache.logging.log4j.core.config.AbstractConfiguration.initialize(AbstractConfiguration.java:237)
at org.apache.logging.log4j.core.config.AbstractConfiguration.start(AbstractConfiguration.java:249)
at org.apache.logging.log4j.core.LoggerContext.setConfiguration(LoggerContext.java:545)
at org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:617)
at org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:634)
at org.apache.logging.log4j.core.LoggerContext.start(LoggerContext.java:229)
at org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:153)
at org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:45)
at org.apache.logging.log4j.LogManager.getContext(LogManager.java:194)
at org.apache.logging.log4j.spi.AbstractLoggerAdapter.getContext(AbstractLoggerAdapter.java:122)
at org.apache.logging.slf4j.Log4jLoggerFactory.getContext(Log4jLoggerFactory.java:43)
at org.apache.logging.log4j.spi.AbstractLoggerAdapter.getLogger(AbstractLoggerAdapter.java:46)
at org.apache.logging.slf4j.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:29)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:242)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:254)
at org.apache.flume.node.Application.(Application.java:59)

2018-11-20 02:53:35,553 main ERROR Unable to invoke factory method in class org.apache.logging.log4j.core.appender.RollingFileAppender for element RollingFile: java.lang.IllegalStateException: No factory method found for class org.apache.logging.log4j.core.appender.RollingFileAppender java.lang.IllegalStateException: No factory method found for class org.apache.logging.log4j.core.appender.RollingFileAppender
at org.apache.logging.log4j.core.config.plugins.util.PluginBuilder.findFactoryMethod(PluginBuilder.java:229)
at org.apache.logging.log4j.core.config.plugins.util.PluginBuilder.build(PluginBuilder.java:134)
at org.apache.logging.log4j.core.config.AbstractConfiguration.createPluginObject(AbstractConfiguration.java:958)
at org.apache.logging.log4j.core.config.AbstractConfiguration.createConfiguration(AbstractConfiguration.java:898)
at org.apache.logging.log4j.core.config.AbstractConfiguration.createConfiguration(AbstractConfiguration.java:890)
at org.apache.logging.log4j.core.config.AbstractConfiguration.doConfigure(AbstractConfiguration.java:513)
at org.apache.logging.log4j.core.config.AbstractConfiguration.initialize(AbstractConfiguration.java:237)
at org.apache.logging.log4j.core.config.AbstractConfiguration.start(AbstractConfiguration.java:249)
at org.apache.logging.log4j.core.LoggerContext.setConfiguration(LoggerContext.java:545)
at org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:617)
at org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:634)
at org.apache.logging.log4j.core.LoggerContext.start(LoggerContext.java:229)
at org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:153)
at org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:45)
at org.apache.logging.log4j.LogManager.getContext(LogManager.java:194)
at org.apache.logging.log4j.spi.AbstractLoggerAdapter.getContext(AbstractLoggerAdapter.java:122)
at org.apache.logging.slf4j.Log4jLoggerFactory.getContext(Log4jLoggerFactory.java:43)
at org.apache.logging.log4j.spi.AbstractLoggerAdapter.getLogger(AbstractLoggerAdapter.java:46)
at org.apache.logging.slf4j.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:29)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:242)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:254)
at org.apache.flume.node.Application.(Application.java:59)

2018-11-20 02:53:35,554 main ERROR Null object returned for RollingFile in Appenders.
2018-11-20 02:53:35,796 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:61)] Configuration provider starting
2018-11-20 02:53:35,800 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)] Reloading configuration file:/home/kirantadisetti/flume_kafka.conf
2018-11-20 02:53:35,805 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1019)] Processing:kafka
2018-11-20 02:53:35,806 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:933)] Added sinks: kafka Agent: wk
2018-11-20 02:53:35,806 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1019)] Processing:kafka
2018-11-20 02:53:35,807 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1019)] Processing:kafka
2018-11-20 02:53:35,807 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1019)] Processing:kafka
2018-11-20 02:53:35,818 (conf-file-poller-0) [WARN - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:700)] Could not configure sink kafka due to: No channel configured for sink: kafka
org.apache.flume.conf.ConfigurationException: No channel configured for sink: kafka
at org.apache.flume.conf.sink.SinkConfiguration.configure(SinkConfiguration.java:52) ~[flume-ng-configuration-1.5.2.2.6.5.0-292.jar:1.5.2.2.6.5.0-292]
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:683) [flume-ng-configuration-1.5.2.2.6.5.0-292.jar:1.5.2.2.6.5.0-292]
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:347) [flume-ng-configuration-1.5.2.2.6.5.0-292.jar:1.5.2.2.6.5.0-292]
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.access$000(FlumeConfiguration.java:212) [flume-ng-configuration-1.5.2.2.6.5.0-292.jar:1.5.2.2.6.5.0-292]
at org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:126) [flume-ng-configuration-1.5.2.2.6.5.0-292.jar:1.5.2.2.6.5.0-292]
at org.apache.flume.conf.FlumeConfiguration.(FlumeConfiguration.java:108) [flume-ng-configuration-1.5.2.2.6.5.0-292.jar:1.5.2.2.6.5.0-292]
at org.apache.flume.node.PropertiesFileConfigurationProvider.getFlumeConfiguration(PropertiesFileConfigurationProvider.java:193) [flume-ng-node-1.5.2.2.6.5.0-292.jar:1.5.2.2.6.5.0-292]
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:94) [flume-ng-node-1.5.2.2.6.5.0-292.jar:1.5.2.2.6.5.0-292]
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140) [flume-ng-node-1.5.2.2.6.5.0-292.jar:1.5.2.2.6.5.0-292]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_151]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_151]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_151]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_151]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_151]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_151]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
2018-11-20 02:53:35,823 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:140)] Post-validation flume configuration contains configuration for agents: [wk]
2018-11-20 02:53:35,823 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:150)] Creating channels
2018-11-20 02:53:35,830 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:40)] Creating instance of channel mem type memory
2018-11-20 02:53:35,834 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:205)] Created channel mem
2018-11-20 02:53:35,834 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:39)] Creating instance of source ws, type exec
2018-11-20 02:53:35,844 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:119)] Channel mem connected to [ws]
2018-11-20 02:53:35,849 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:138)] Starting new configuration:{ sourceRunners:{ws=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:ws,state:IDLE} }} sinkRunners:{} channels:{mem=org.apache.flume.channel.MemoryChannel{name: mem}} }
2018-11-20 02:53:35,856 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:145)] Starting Channel mem
2018-11-20 02:53:35,857 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: mem: Successfully registered new MBean.
2018-11-20 02:53:35,857 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: mem started
2018-11-20 02:53:35,858 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Starting Source ws
2018-11-20 02:53:35,858 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.source.ExecSource.start(ExecSource.java:163)] Exec source starting with command: tail -F /opt/gen_logs/logs/access.log
2018-11-20 02:53:35,859 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: ws: Successfully registered new MBean.
2018-11-20 02:53:35,859 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: ws started


#2

@Kiran_Tadisetti There are many typos in your conf file.

use conf file as below:

#Name the components on this agent
wk.sources = ws
wk.sinks = kafka
wk.channels = mem

#describe the Sources
wk.sources.ws.type = exec
wk.sources.ws.command = tail -F /opt/gen_logs/logs/access.log

#describe the sink
wk.sinks.kafka.type = org.apache.flume.sink.kafka.KafkaSink
wk.sinks.kafka.brokerList = wn01.itversity.com:6667,wn02.itversity.com:6667,wn03.itversity.com:6667
wk.sinks.kafka.topic = flume_kafka_demo

#describe the channel
wk.channels.mem.type = memory
wk.channels.mem.capacity = 1000
wk.channels.mem.transactionCapacity = 100

#bind the sources and sinks to the channel
wk.sources.ws.channels = mem
wk.sinks.kafka.channel = mem

And run kafka consumer as below:
kafka-console-consumer.sh --bootstrap-server wn01.itversity.com:6667,wn02.itversity.com:6667,wn03.itversity.com:6667 --topic flume_kafka_demo --from-beginning