Kafka , Flume Streaming Error

We have configured our own cluster in the Ubuntu 16.04. We want streaming data from IOT Sensors to be in Hive directly . We have successfully configured the Path with some sample data -> Kafka-> Flume -> Hive . Finally the data we pass thru Kafka will come to Hive table instantly . Within few insertions in Hive table , we are getting Hive " Exception in thread “SinkRunner-PollingRunner-DefaultSinkProcessor” java.lang.OutOfMemoryError: Java heap space “. I have googled and Couldn’t find something which works in Web.
Workaround 1 : Increase the Heap Size in Flume-env.sh : export JAVA_OPTS=”-Xms6144m -Xmx6144m -XX:NewSize=256m -XX:MaxNewSize=356m -XX:PermSize=256m -XX:MaxPermSize=356m"
System Config : i7 Processor , 8 GB RAM and 1 TB HDD .
Please give your suggestions . Is that work this way or do we have something to proceed with the way .

How are you loading data into hive table?

Hi,

Actually you can easily achieve the same by using the below flow.
Sample Data -> Flume -> Kakfa -> Spark Streaming -> Hive
or
Sample Data -> Kafka -> Spark Streaming -> Hive
I think you can remove the flume and bring more stability to the streaming using spark.
I hope it will help you cleaning up the issues.
Or if you want you can add 3 sinks to the same flume channel and it will act like load balancer and your data will get sinked faster.

Thanks
ixit

1 Like

Yes Using the Hive sink in Flume Configuration. I’m able to do it . I’ve recently upgraded the RAM memory to 24 GB and still receive the same error " Java heap space issue "

Below is the Flume.conf file .

flumeagent1.sources = source_from_kafka
flumeagent1.channels = mem_channel
flumeagent1.sinks = hive_sink

Define / Configure source

flumeagent1.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource
flumeagent1.sources.source_from_kafka.zookeeperConnect = localhost:2181
flumeagent1.sources.source_from_kafka.topic = sale
flumeagent1.sources.source_from_kafka.groupID = flume
flumeagent1.sources.source_from_kafka.channels = mem_channel
flumeagent1.sources.source_from_kafka.interceptors = i1
flumeagent1.sources.source_from_kafka.interceptors.i1.type = timestamp
flumeagent1.sources.source_from_kafka.consumer.timeout.ms = 1000

Hive Sink

flumeagent1.sinks.hive_sink.type = hive
flumeagent1.sinks.hive_sink.hive.metastore = thrift://localhost:9083
flumeagent1.sinks.hive_sink.hive.database = customer
flumeagent1.sinks.hive_sink.hive.table = f1
flumeagent1.sinks.hive_sink.hive.txnsPerBatchAsk = 2
flumeagent1.sinks.hive_sink.hive.partition = %y-%m-%d-%H-%M
flumeagent1.sinks.hive_sink.batchSize = 10
flumeagent1.sinks.hive_sink.serializer = DELIMITED
flumeagent1.sinks.hive_sink.serializer.delimiter = ,
flumeagent1.sinks.hive_sink.serializer.fieldnames = id,name,email,street_address,company

Use a channel which buffers events in memory

flumeagent1.channels.mem_channel.type = memory
flumeagent1.channels.mem_channel.capacity = 10000
flumeagent1.channels.mem_channel.transactionCapacity = 100

Bind the source and sink to the channel

flumeagent1.sources.source_from_kafka.channels = mem_channel
flumeagent1.sinks.hive_sink.channel = mem_channel

Yes , Either way could be easy and stable . I have tried to do it myself and researched a lot in the Web. Can’t help myself to find the tutorials on achieving it in a short period . I’m not familiar with Spark . For time being , I have taken up the path -> Kafka -> Flume -> Hive . I wanted to try with Spark Streaming with Kafka. Do you have any valid source/content to try the path Data-> Kafka->spark -> hive . If anyone having that pls help me .

You can check currently running live training videos Big Data certification workshop # 30 to 33. Durga sir has explained flume, kafka, Spark streaming. Check those, it might help you.

yes ,Thanks I have added already to my playlist to watch it later . Can anyone have solution to the problem.

Note : I have achieved using -Kafka -> Flume -> hdfs and write to Hive later.

@Raj, I have a question, since Hive is just a layer on top of HDFS, are you storing your data to HDFS directory and then trying to run Hive commands on that?

I’m storing the data into HDFS and the path for storage is Hive default warehouse , So whenever new data gets received from Kafka-Flume-hdfs-hivewarehouse , It will be reflected in Hive. Is that you want , Or do you need more info ?

Yeah, wanted to know that. If you have achieved your solution using Kafka -> Flume -> Hive, do you find a need for Spark in the pipeline? Because of Flume -> HDFS is high performant. If you have Spark in between, it would dampen the performance.

In my guess Kafka -> Flume -> HDFS is more performant than Kafka-> Spark -> HDFS.

@venkatreddy-amalla might have much better idea on this.

But I have no knowledge in Spark to try this now . So I have taken this path . So far I have tested this with less amount of data only . So far it is working fine .

Hello,
With flume scalability will be a big bottleneck if you have hug dataset coming in.I was facing the same issue and was not even able to process few million records in a minute using flume -> HDFS.
Then replaced the same with Spark Streaming and now,I am able to process billions of records without any changes at code level.Only need to increase the number of executors.That also I am able to do at 1 GB of RAM.
In My case flow is datasource -> Spark Streaming --> HDFS --> kafka
earlier it was datasource ->Flume ->HDFS --> kafka

For Spark Streaming you can use the Spark Doc or Some videos on Youtube.
But it is worth to test the same.
Thanks.
ixit

1 Like

Hi raj,

For the time being you can add multile sinks for the same channel

Like
Sinks : sink1 sink2 sink3
and map all of them to the same channel and it will resolve your issue.
I think you are getting error because data in kafka is coming too fast and your sink processing is too slow.
So,by the time you have any other thing available.You can try adding more sinks for the same channel.
It will double your sink speed if you have two sinks,and triple if you have 3 sinks.

Pls try and let us know.
I have done the same to speed up the sink

Thanks
ixit

1 Like

flumeagent1.sources = source_from_kafka
flumeagent1.channels = mem_channel
flumeagent1.sinks = hive_sink1 hive_sink2 hive_sink3

Define / Configure source
flumeagent1.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource
flumeagent1.sources.source_from_kafka.zookeeperConnect = localhost:2181
flumeagent1.sources.source_from_kafka.topic = sale
flumeagent1.sources.source_from_kafka.groupID = flume
flumeagent1.sources.source_from_kafka.channels = mem_channel
flumeagent1.sources.source_from_kafka.interceptors = i1
flumeagent1.sources.source_from_kafka.interceptors.i1.type = timestamp
flumeagent1.sources.source_from_kafka.consumer.timeout.ms = 1000

Hive Sink
flumeagent1.sinks.hive_sink1.type = hive
flumeagent1.sinks.hive_sink1.hive.metastore = thrift://localhost:9083
flumeagent1.sinks.hive_sink1.hive.database = customer
flumeagent1.sinks.hive_sink1.hive.table = f1
flumeagent1.sinks.hive_sink1.hive.txnsPerBatchAsk = 2
flumeagent1.sinks.hive_sink1.hive.partition = %y-%m-%d-%H-%M
flumeagent1.sinks.hive_sink1.batchSize = 100
flumeagent1.sinks.hive_sink1.serializer = DELIMITED
flumeagent1.sinks.hive_sink1.serializer.delimiter = ,
flumeagent1.sinks.hive_sink1.serializer.fieldnames = id,name,email,street_address,company

Hive Sink
flumeagent1.sinks.hive_sink2.type = hive
flumeagent1.sinks.hive_sink2.hive.metastore = thrift://localhost:9083
flumeagent1.sinks.hive_sink2.hive.database = customer
flumeagent1.sinks.hive_sink2.hive.table = f1
flumeagent1.sinks.hive_sink2.hive.txnsPerBatchAsk = 2
flumeagent1.sinks.hive_sink2.hive.partition = %y-%m-%d-%H-%M
flumeagent1.sinks.hive_sink2.batchSize = 100
flumeagent1.sinks.hive_sink2.serializer = DELIMITED
flumeagent1.sinks.hive_sink2.serializer.delimiter = ,
flumeagent1.sinks.hive_sink2.serializer.fieldnames = id,name,email,street_address,company

Hive Sink
flumeagent1.sinks.hive_sink3.type = hive
flumeagent1.sinks.hive_sink3.hive.metastore = thrift://localhost:9083
flumeagent1.sinks.hive_sink3.hive.database = customer
flumeagent1.sinks.hive_sink3.hive.table = f1
flumeagent1.sinks.hive_sink3.hive.txnsPerBatchAsk = 2
flumeagent1.sinks.hive_sink3.hive.partition = %y-%m-%d-%H-%M
flumeagent1.sinks.hive_sink3.batchSize = 100
flumeagent1.sinks.hive_sink3.serializer = DELIMITED
flumeagent1.sinks.hive_sink3.serializer.delimiter = ,
flumeagent1.sinks.hive_sink3.serializer.fieldnames = id,name,email,street_address,company

Use a channel which buffers events in memory
flumeagent1.channels.mem_channel.type = memory
flumeagent1.channels.mem_channel.capacity = 10000
flumeagent1.channels.mem_channel.transactionCapacity = 100

Bind the source and sink to the channel
flumeagent1.sources.source_from_kafka.channels = mem_channel
flumeagent1.sinks.hive_sink1.channel = mem_channel
flumeagent1.sinks.hive_sink2.channel = mem_channel
flumeagent1.sinks.hive_sink3.channel = mem_channel

4 Likes

Hi @ixitshah, yeah I think that might be the issue, yeah scalability becomes an issue with Flume. But the idea of adding multiple sinks might be a good workaround.

1 Like

Thanks @ixitshah & @pramodvspk , I will try that out .Also I wanted to try this with Spark .

Yeah Sure.I hope it will help you resolve the issue temporarily.
Please let us know the result.