Issue with Flume

flume
#1

Hello All,

I submitted a Flume-ng job but the directory /user/cloudera/flume is not getting generated. I am trying to ingest data into HDFS using EXEC source and FILE channel. I am submitting start_logs command after submitting the flume job. Please let me know what could be possible issue?

Here is the conf file

example.conf: A single-node Flume configuration

Name the components on this agent

a1.sources = r1
a1.sinks = k1
a1.channels = c1

Describe/configure the source

a1.sources.r1.type = exec
a1.sources.r1.bind = tail -f /opt/gen_logs/logs/access.log
a1.sources.r1.channels = c1

Describe the sink

a1.sinks.k1.type = hdfs

Use a channel which buffers events in memory

a1.channels.c1.type = FILE
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity = 1000
a1.channels.c1.checkpointInterval 300000

Customize sink for HDFS

a1.sinks.k1.hdfs.path = /user/cloudera/flume/%y-%m-%d
a1.sinks.k1.hdfs.filePrefix = flume-%y-%m-%d
a1.sinks.k1.hdfs.rollSize = 1048576
a1.sinks.k1.hdfs.rollCount = 100
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 120
a1.sinks.k1.hdfs.idleTimeout = 10
a1.sinks.k1.hdfs.useLocalTimestamp = true

Bind the source and sink to the channel

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

0 Likes

#2

@rahulabvp Please place the logs…

0 Likes

#3

@Raja_Shyam Looks like there is some issue with source EXEC. “Source r1 has been removed due to an error during configuration”

Here are the logs.

16/12/31 16:13:53 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
16/12/31 16:13:53 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/home/cloudera/flume/conf/example.conf
16/12/31 16:13:53 INFO conf.FlumeConfiguration: Processing:k1
16/12/31 16:13:53 INFO conf.FlumeConfiguration: Processing:k1
16/12/31 16:13:53 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1
16/12/31 16:13:53 INFO conf.FlumeConfiguration: Processing:k1
16/12/31 16:13:53 INFO conf.FlumeConfiguration: Processing:k1
16/12/31 16:13:53 INFO conf.FlumeConfiguration: Processing:k1
16/12/31 16:13:53 INFO conf.FlumeConfiguration: Processing:k1
16/12/31 16:13:53 INFO conf.FlumeConfiguration: Processing:k1
16/12/31 16:13:53 INFO conf.FlumeConfiguration: Processing:k1
16/12/31 16:13:53 INFO conf.FlumeConfiguration: Processing:k1
16/12/31 16:13:53 INFO conf.FlumeConfiguration: Processing:k1
16/12/31 16:13:53 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1]
16/12/31 16:13:53 INFO node.AbstractConfigurationProvider: Creating channels
16/12/31 16:13:53 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type FILE
16/12/31 16:13:53 INFO node.AbstractConfigurationProvider: Created channel c1
16/12/31 16:13:53 INFO source.DefaultSourceFactory: Creating instance of source r1, type exec
16/12/31 16:13:53 ERROR node.AbstractConfigurationProvider: Source r1 has been removed due to an error during configuration
java.lang.IllegalStateException: The parameter command must be specified
at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
at org.apache.flume.source.ExecSource.configure(ExecSource.java:227)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:326)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
16/12/31 16:13:53 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: hdfs
16/12/31 16:13:53 INFO node.AbstractConfigurationProvider: Channel c1 connected to [k1]
16/12/31 16:13:53 INFO node.Application: Starting new configuration:{ sourceRunners:{} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@3b1e7dd counterGroup:{ name:null counters:{} } }} channels:{c1=FileChannel c1 { dataDirs: [/home/cloudera/.flume/file-channel/data] }} }
16/12/31 16:13:53 INFO node.Application: Starting Channel c1
16/12/31 16:13:53 INFO file.FileChannel: Starting FileChannel c1 { dataDirs: [/home/cloudera/.flume/file-channel/data] }…
16/12/31 16:13:53 INFO file.Log: Encryption is not enabled
16/12/31 16:13:53 INFO file.Log: Replay started
16/12/31 16:13:53 INFO file.Log: Found NextFileID 4, from [/home/cloudera/.flume/file-channel/data/log-2, /home/cloudera/.flume/file-channel/data/log-4, /home/cloudera/.flume/file-channel/data/log-3, /home/cloudera/.flume/file-channel/data/log-1]
16/12/31 16:13:53 INFO file.EventQueueBackingStoreFileV3: Starting up with /home/cloudera/.flume/file-channel/checkpoint/checkpoint and /home/cloudera/.flume/file-channel/checkpoint/checkpoint.meta
16/12/31 16:13:53 INFO file.EventQueueBackingStoreFileV3: Reading checkpoint metadata from /home/cloudera/.flume/file-channel/checkpoint/checkpoint.meta
16/12/31 16:13:54 INFO file.FlumeEventQueue: QueueSet population inserting 0 took 0
16/12/31 16:13:54 INFO file.Log: Last Checkpoint Sat Dec 31 01:35:09 PST 2016, queue depth = 0
16/12/31 16:13:54 INFO file.Log: Replaying logs with v2 replay logic
16/12/31 16:13:54 INFO file.ReplayHandler: Starting replay of [/home/cloudera/.flume/file-channel/data/log-1, /home/cloudera/.flume/file-channel/data/log-2, /home/cloudera/.flume/file-channel/data/log-3, /home/cloudera/.flume/file-channel/data/log-4]
16/12/31 16:13:54 INFO file.ReplayHandler: Replaying /home/cloudera/.flume/file-channel/data/log-1
16/12/31 16:13:54 INFO tools.DirectMemoryUtils: Unable to get maxDirectMemory from VM: NoSuchMethodException: sun.misc.VM.maxDirectMemory(null)
16/12/31 16:13:54 INFO tools.DirectMemoryUtils: Direct Memory Allocation: Allocation = 1048576, Allocated = 0, MaxDirectMemorySize = 18874368, Remaining = 18874368
16/12/31 16:13:54 INFO file.LogFile: Checkpoint for file(/home/cloudera/.flume/file-channel/data/log-1) is: 1483175541726, which is beyond the requested checkpoint time: 1483176442134 and position 0
16/12/31 16:13:54 INFO file.ReplayHandler: Replaying /home/cloudera/.flume/file-channel/data/log-2
16/12/31 16:13:54 INFO file.LogFile: Checkpoint for file(/home/cloudera/.flume/file-channel/data/log-2) is: 1483175679803, which is beyond the requested checkpoint time: 1483176442134 and position 0
16/12/31 16:13:54 INFO file.ReplayHandler: Replaying /home/cloudera/.flume/file-channel/data/log-3
16/12/31 16:13:54 INFO file.LogFile: Checkpoint for file(/home/cloudera/.flume/file-channel/data/log-3) is: 1483176013130, which is beyond the requested checkpoint time: 1483176442134 and position 0
16/12/31 16:13:54 INFO file.ReplayHandler: Replaying /home/cloudera/.flume/file-channel/data/log-4
16/12/31 16:13:54 INFO file.LogFile: Checkpoint for file(/home/cloudera/.flume/file-channel/data/log-4) is: 1483176442134, which is beyond the requested checkpoint time: 1483176442134 and position 0
16/12/31 16:13:54 INFO file.ReplayHandler: read: 0, put: 0, take: 0, rollback: 0, commit: 0, skip: 0, eventCount:0
16/12/31 16:13:54 INFO file.FlumeEventQueue: Search Count = 0, Search Time = 0, Copy Count = 0, Copy Time = 0
16/12/31 16:13:54 INFO file.Log: Rolling /home/cloudera/.flume/file-channel/data
16/12/31 16:13:54 INFO file.Log: Roll start /home/cloudera/.flume/file-channel/data
16/12/31 16:13:54 INFO file.LogFile: Opened /home/cloudera/.flume/file-channel/data/log-5
16/12/31 16:13:54 INFO file.Log: Roll end
16/12/31 16:13:54 INFO file.EventQueueBackingStoreFile: Start checkpoint for /home/cloudera/.flume/file-channel/checkpoint/checkpoint, elements to sync = 0
16/12/31 16:13:54 INFO file.EventQueueBackingStoreFile: Updating checkpoint metadata: logWriteOrderID: 1483229633973, queueSize: 0, queueHead: 0
16/12/31 16:13:54 INFO file.Log: Updated checkpoint for file: /home/cloudera/.flume/file-channel/data/log-5 position: 0 logWriteOrderID: 1483229633973
16/12/31 16:13:54 INFO file.FileChannel: Queue Size after replay: 0 [channel=c1]
16/12/31 16:13:54 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
16/12/31 16:13:54 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
16/12/31 16:13:54 INFO node.Application: Starting Sink k1
16/12/31 16:13:54 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
16/12/31 16:13:54 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started
^C16/12/31 16:17:40 INFO lifecycle.LifecycleSupervisor: Stopping lifecycle supervisor 10
16/12/31 16:17:40 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider stopping
16/12/31 16:17:40 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 stopped
16/12/31 16:17:40 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: k1. sink.start.time == 1483229634230
16/12/31 16:17:40 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: k1. sink.stop.time == 1483229860217
16/12/31 16:17:40 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: k1. sink.batch.complete == 0
16/12/31 16:17:40 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: k1. sink.batch.empty == 48
16/12/31 16:17:40 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: k1. sink.batch.underflow == 0
16/12/31 16:17:40 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: k1. sink.connection.closed.count == 0
16/12/31 16:17:40 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: k1. sink.connection.creation.count == 0
16/12/31 16:17:40 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: k1. sink.connection.failed.count == 0
16/12/31 16:17:40 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: k1. sink.event.drain.attempt == 0
16/12/31 16:17:40 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: k1. sink.event.drain.sucess == 0
16/12/31 16:17:40 INFO file.FileChannel: Stopping FileChannel c1 { dataDirs: [/home/cloudera/.flume/file-channel/data] }…
16/12/31 16:17:40 INFO file.EventQueueBackingStoreFile: Start checkpoint for /home/cloudera/.flume/file-channel/checkpoint/checkpoint, elements to sync = 0
16/12/31 16:17:40 INFO file.EventQueueBackingStoreFile: Updating checkpoint metadata: logWriteOrderID: 1483229633974, queueSize: 0, queueHead: 0
16/12/31 16:17:40 INFO file.Log: Updated checkpoint for file: /home/cloudera/.flume/file-channel/data/log-5 position: 0 logWriteOrderID: 1483229633974
Attempting to shutdown background worker.
16/12/31 16:17:40 INFO file.Log: Attempting to shutdown background worker.
16/12/31 16:17:40 INFO file.LogFile: Closing /home/cloudera/.flume/file-channel/data/log-5
16/12/31 16:17:40 INFO file.LogFile: Closing RandomReader /home/cloudera/.flume/file-channel/data/log-1
16/12/31 16:17:40 INFO file.LogFile: Closing RandomReader /home/cloudera/.flume/file-channel/data/log-2
16/12/31 16:17:40 INFO file.LogFile: Closing RandomReader /home/cloudera/.flume/file-channel/data/log-3
16/12/31 16:17:40 INFO file.LogFile: Closing RandomReader /home/cloudera/.flume/file-channel/data/log-4
16/12/31 16:17:40 INFO file.LogFile: Closing RandomReader /home/cloudera/.flume/file-channel/data/log-5
16/12/31 16:17:40 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 stopped
16/12/31 16:17:40 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: CHANNEL, name: c1. channel.start.time == 1483229634229
16/12/31 16:17:40 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: CHANNEL, name: c1. channel.stop.time == 1483229860259
16/12/31 16:17:40 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: CHANNEL, name: c1. channel.capacity == 20000
16/12/31 16:17:40 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: CHANNEL, name: c1. channel.current.size == 0
16/12/31 16:17:40 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: CHANNEL, name: c1. channel.event.put.attempt == 0
16/12/31 16:17:40 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: CHANNEL, name: c1. channel.event.put.success == 0
16/12/31 16:17:40 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: CHANNEL, name: c1. channel.event.take.attempt == 48
16/12/31 16:17:40 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: CHANNEL, name: c1. channel.event.take.success == 0

0 Likes

#4

@rahulabvp Please avoid specifying the channels multiple times, The configuration was mentioned 2 times

0 Likes