Flume Ingestion for Directories , Sub Directories


#1

I have a folder Structure like this :

2017
2017/05
2017/05/25
2017/05/25/01
2017/05/25/01/tag_data_2017_05_25_01_32_34.txt

In short : ( 2017/05/25/01/Filename.txt) ( YYYY/MM/DD/HH/filename.txt)

I want to ingest the data from Local System to Hadoop using Flume with same directory structure . But flume is ignoring the directories and taking only files . Is there anything I need to do with my configuration

Flume version : 1.70

agent1.channels.fileChannel1_1.type = file
agent1.channels.fileChannel1_1.capacity = 200000
agent1.channels.fileChannel1_1.transactionCapacity = 1000

agent1.sources.source1_1.type = spooldir

agent1.sources.source1_1.spoolDir = /home/user/Desktop/test/
agent1.sources.source1_1.fileHeader = false
agent1.sinks.hdfs-sink1_1.type = hdfs

agent1.sinks.hdfs-sink1_1.hdfs.path = /user/local/hdfs/
agent1.sinks.hdfs-sink1_1.hdfs.batchSize = 1000
agent1.sinks.hdfs-sink1_1.hdfs.rollSize = 2000
agent1.sinks.hdfs-sink1_1.hdfs.rollInterval = 0
agent1.sinks.hdfs-sink1_1.hdfs.rollCount = 500
agent1.sinks.hdfs-sink1_1.hdfs.writeFormat=Text

agent1.sinks.hdfs-sink1_1.hdfs.fileType = DataStream
agent1.sources.source1_1.channels = fileChannel1_1
agent1.sinks.hdfs-sink1_1.channel = fileChannel1_1

agent1.sinks = hdfs-sink1_1
agent1.sources = source1_1
agent1.channels = fileChannel1_1

Kindly Help


#2

Sample config to get data on yearly, monthly, daily and hourly from twitter

TwitterAgent.sinks.HDFS.hdfs.path = hdfs://hadoop1:8020/user/flume/tweets/%Y/%m/%d/%H/

You can try this similar configuration.

Let us know if it works.


#3

sir,Can you please check my question again and brief me.since i was expecting a different logic .


#4

I think , I am missing on the file name pattern. So please try the following.

agent1.sinks.hdfs-sink1_1.hdfs.path = /user/local/hdfs/%Y/%m/%d/%H/filename_%Y_%M_%D_%H.txt

This might work, Can you try and let me know?


#5

%Y/%m/%d/%H should be extracted from the file name . Not the current %Y/%m/%d/%H .

Lets say my file name is tag_data_2017_05_25_01_32_34.txt , the folder strcuture will be like this 2017/05/25/01 and I want flume to take this to the same level of partition .


#6

@vinodnerella Also , I cannot make your script happen . Since the source should be Cloudera or Twitter .

java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null

It was the err, I’m receiving.


#7

try this Once

agent1.channels.fileChannel1_1.type = file
agent1.channels.fileChannel1_1.capacity = 200000
agent1.channels.fileChannel1_1.transactionCapacity = 1000

agent1.sources.source1_1.type = spooldir

agent1.sources.source1_1.spoolDir = /home/user/Desktop/test/
agent1.sources.source1_1.fileHeader = true
agent1.sources.source1_1.fileHeaderKey = file
agent1.sinks.hdfs-sink1_1.type = hdfs

agent1.sinks.hdfs-sink1_1.hdfs.path = /user/local/hdfs/%{file}
agent1.sinks.hdfs-sink1_1.hdfs.batchSize = 1000
agent1.sinks.hdfs-sink1_1.hdfs.rollSize = 2000
agent1.sinks.hdfs-sink1_1.hdfs.rollInterval = 0
agent1.sinks.hdfs-sink1_1.hdfs.rollCount = 500
agent1.sinks.hdfs-sink1_1.hdfs.writeFormat=Text

agent1.sinks.hdfs-sink1_1.hdfs.fileType = DataStream
agent1.sources.source1_1.channels = fileChannel1_1
agent1.sinks.hdfs-sink1_1.channel = fileChannel1_1

agent1.sinks = hdfs-sink1_1
agent1.sources = source1_1
agent1.channels = fileChannel1_1

added below changes to your conf
agent1.sources.source1_1.fileHeader = true
agent1.sources.source1_1.fileHeaderKey = file
agent1.sinks.hdfs-sink1_1.hdfs.path = /user/local/hdfs/%{file}


#8

Thanks everyone for the suggestion, we have the option for recursive directory search.

->recursiveDirectorySearch = true
agent1.sinks.hdfs-sink1_1.hdfs.recursiveDirectorySearch = true

Wheather to monitor subdirectories for new files to read.

Adjust the configuration file and it will search for subdirectories. Tried in version Flume 1.7. It is working perfectly.


#9

Hi,
I have enabled recursiveDirectorySearch=true to look in to the sub directories for files.

source.spoolDir=/tmp/test

and under /tmp/test, subdirectories get created with data files
/tmp/test/data1/file.csv
/tmp/test/data2/file2.csv .

I want the exact sub directory structure to be created in the HDFS sink path.

/sink/data1/file.csv

/sink/data2/file2.csv

When i use the %{file} for HDFS sink filepath, i get the complete absolute path, and %{basename} gives me only the file name. I want to extract the sub directory structure from the spooldir source path. Any way to achieve this?


#10

%{file} gives the absolute file path, is there any way to get a substring out of it?