Flume + Spark Streaming job fails with java.io.IOException


#1

Hi,

I get the following error when I run the spark job for the Flume+Spark Streaming integration tutorial.

The spark-submit command is as below:

spark-submit --master yarn
–conf spark.ui.port=12345
–jars “/usr/hdp/2.6.5.0-292/flume/lib/flume-ng-sdk-1.5.2.2.6.5.0-292.jar,/usr/hdp/2.5.0.0-1245/spark/lib/spark-streaming-flume-sink_2.10-1.6.2.jar,/usr/hdp/2.5.0.0-1245/spark/lib/spark-streaming-flume_2.10-1.6.2.jar”
src/main/python/StreamingFlumeDepartmentCount.py gw02.itversity.com 8123 /home/pintoalan/streamingflumedepartmentcount/cnt

Please advise asap.

Regards,
Ivan


#2

@pintoalan Run the flume-ng agent command before running spark-submit job.


#3

The Flume-ng agent was already running before I submitted the spark job. The only difference between the tutorial and my spark-submit command is in the version of the flume sdk jar which was being used.

Please advise.


#4

@pintoalan Can you share flume-ng command as well?


#5

flume-ng agent -n sdc -f sdc.conf --conf-dir /etc/flume/conf

flume configuration file is as below:

example.conf: A single-node Flume configuration

Name the components on this agent

sdc.sources = ws
sdc.sinks = hd spark
sdc.channels = hdmem sparkmem

Describe/configure the source

sdc.sources.ws.type = exec
sdc.sources.ws.command = tail -f /opt/gen_logs/logs/access.log

Describe the sink

sdc.sinks.hd.type = hdfs
sdc.sinks.hd.hdfs.path= hdfs://nn01.itversity.com:8020/user/pintoalan/flume_demo
sdc.sinks.hd.hdfs.filePrefix = StreamDeptCount
sdc.sinks.hd.hdfs.fileSuffix = .txt
sdc.sinks.hd.hdfs.rollInterval = 120
sdc.sinks.hd.hdfs.rollSize = 1048576
sdc.sinks.hd.hdfs.rollCount = 100
sdc.sinks.hd.hdfs.fileType = DataStream

agent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
agent.sinks.spark.hostname = gw02.itversity.com
agent.sinks.spark.port = 8123

Use a channel sdcich buffers events in memory

sdc.channels.hdmem.type = memory
sdc.channels.hdmem.capacity = 1000
sdc.channels.hdmem.transactionCapacity = 100

sdc.channels.sparkmem.type = memory
sdc.channels.sparkmem.capacity = 1000
sdc.channels.sparkmem.transactionCapacity = 100

Bind the source and sink to the channel

sdc.sources.ws.channels = hdmem sparkmem
sdc.sinks.hd.channel = hdmem
sdc.sinks.spark.channel = sparkmem


#6

You have given incorrect agent_name. Change it to sdc and try again.


#7

Thanks for pointing out the error in the agent config file.

There are no errors raised when the spark job is initialized. However it seems data is not being consumed by the spark sink and the files written to the HDFS location are 0 byte files. I can see files with data on the hdfs sink written by the flume agent , however I suspect that no data is being consumed by the spark sink.

My spark application code is as follows:

from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils
import sys
from operator import add
hostname=sys.argv[1]
port=int(sys.argv[2])
conf=SparkConf().setAppName(“Streaming Department Count”).setMaster(“yarn-client”)
sc=SparkContext(conf=conf)
ssc=StreamingContext(sc,30)
agents=[(hostname,port)]
pollingStream=FlumeUtils.createPollingStream(ssc, agents)
messages=pollingStream.map(lambda msg: msg[1])
departmentMessages=messages.filter(lambda msg: msg.split(" “)[6].split(”/")[1] == “department”)
departmentNames=departmentMessages.map(lambda msg: (msg.split(" “)[6].split(”/")[2],1))
departmentCount=departmentNames.reduceByKey(add)
outputPrefix = sys.argv[3]
departmentCount.saveAsTextFiles(outputPrefix)
ssc.start()
ssc.awaitTermination()


#8

I have tested using your console it is working fine.


#9

Thanks for the support


closed #10