Spark Streaming with Flume

Hi, i am doing spark flume department count example.
As first i started flume agent…sinking data in hdfs and spark streaming… it is working:

but program jar file is always shows this
->
spark-submit --class FlumeSparkDepartmentCount --master yarn --conf spark.ui.port=22231 --jars “/home/cloudera/Public/spark-streaming_2.10-1.6.2.jar,/home/cloudera/Public/spark-streaming-flume_2.10-1.6.2.jar,/home/cloudera/Public/spark-streaming-flume-sink_2.11-2.1.1.jar,/home/cloudera/Public/commons-lang3-3.5.jar,/home/cloudera/Public/flume-ng-sdk-1.5.2.2.4.2.12-1.jar” acc.jar

Here is my : fsmp.conf file

Name the components on this agent

fsmp.sources = logsource
fsmp.sinks = sparksink hdfssink
fsmp.channels = sparkchannel hdfschannel

Describe/configure the source

fsmp.sources.logsource.type = exec
fsmp.sources.logsource.command = tail -F /opt/gen_logs/logs/access.log

Describe the sink

fsmp.sinks.sparksink.type = org.apache.spark.streaming.flume.sink.SparkSink
fsmp.sinks.sparksink.hostname = localhost
fsmp.sinks.sparksink.port = 8887
fsmp.sinks.sparksink.channel = sparkchannel

Use a channel which buffers events in memory

fsmp.channels.sparkchannel.type = memory
fsmp.channels.sparkchannel.capacity = 1000

Bind the source and sink to the channel

fsmp.sources.logsource.channels = sparkchannel hdfschannel
fsmp.sinks.sparksink.channel = sparkchannel

#Describe the sink
fsmp.sinks.hdfssink.type = hdfs
fsmp.sinks.hdfssink.hdfs.path = hdfs:///user/cloudera/wllab/flume_example_%Y-%m-%d
fsmp.sinks.hdfssink.hdfs.fileType = DataStream
fsmp.sinks.hdfssink.hdfs.rollInterval = 120
fsmp.sinks.hdfssink.hdfs.rollSize = 10485760
fsmp.sinks.hdfssink.hdfs.rollCount = 30
fsmp.sinks.hdfssink.hdfs.filePrefix = retail
fsmp.sinks.hdfssink.hdfs.fileSuffix = .txt
fsmp.sinks.hdfssink.hdfs.inUseSuffix = .tmp
fsmp.sinks.hdfssink.hdfs.useLocalTimeStamp = true

#Use a channel which buffers events in file for HDFS sink
fsmp.channels.hdfschannel.type = file
fsmp.channels.hdfschannel.capacity = 1000
fsmp.channels.hdfschannel.transactionCapacity = 100

fsmp.sinks.hdfssink.channel = hdfschannel

with port 18887: Where left side is program Jar and right side window is of agent running… So why it is not showing results…

You have written 8887 port in flume configuration file.
Are you using the same port in spark? Are you writing data into hdfs or printing in spark streaming? Paste code if possible.

**

Here are both scripts that i am using :slight_smile:

**

fsmp.sources = logsource
fsmp.sinks = sparksink hdfssink
fsmp.channels = sparkchannel hdfschannel

#Describe/configure the source
fsmp.sources.logsource.type = exec
fsmp.sources.logsource.command = tail -F /opt/gen_logs/logs/access.log

#Describe the sink
fsmp.sinks.sparksink.type = org.apache.spark.streaming.flume.sink.SparkSink
fsmp.sinks.sparksink.hostname = localhost
fsmp.sinks.sparksink.port = 18889
fsmp.sinks.sparksink.channel = sparkchannel

#Use a channel which buffers events in memory
fsmp.channels.sparkchannel.type = memory
fsmp.channels.sparkchannel.capacity = 1000

#Bind the source and sink to the channel
fsmp.sources.logsource.channels = sparkchannel hdfschannel
fsmp.sinks.sparksink.channel = sparkchannel

#Describe the sink
fsmp.sinks.hdfssink.type = hdfs
fsmp.sinks.hdfssink.hdfs.path = hdfs:///user/cloudera/wllab/flume_example_%Y-%m-%d
fsmp.sinks.hdfssink.hdfs.fileType = DataStream
fsmp.sinks.hdfssink.hdfs.rollInterval = 120
fsmp.sinks.hdfssink.hdfs.rollSize = 10485760
fsmp.sinks.hdfssink.hdfs.rollCount = 30
fsmp.sinks.hdfssink.hdfs.filePrefix = retail
fsmp.sinks.hdfssink.hdfs.fileSuffix = .txt
fsmp.sinks.hdfssink.hdfs.inUseSuffix = .tmp
fsmp.sinks.hdfssink.hdfs.useLocalTimeStamp = true

#Use a channel which buffers events in file for HDFS sink
fsmp.channels.hdfschannel.type = file
fsmp.channels.hdfschannel.capacity = 1000
fsmp.channels.hdfschannel.transactionCapacity = 100

fsmp.sinks.hdfssink.channel = hdfschannel


// spark streaming program

import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.flume._
object FlumeSparkDepartmentCount {

def main(args: Array[String]): Unit = {
val batchInterval = Seconds(30)
val sparkConf = new SparkConf().setAppName(“FlumePollingEventCount”).setMaster(“yarn-client”)
val ssc = new StreamingContext(sparkConf, batchInterval)
val host = "localhost"
val port = 18889

val stream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.
  createPollingStream(ssc, host, port)
//    stream.map(e => new String(e.event.getBody.array())).print
stream.map(e => new String(e.event.getBody.array())).
  filter(rec => rec.contains("GET /department/")).
  map(rec => (rec.split(" ")(6).split("/")(2), 1)).
  reduceByKey(_ + _).
  print()

//    val streamMap = stream.map(e => new String(e.event.getBody.array()))
//    val streamFilter = streamMap.
//      filter(rec => rec.contains("GET /department/"))
//    val departmentMap = streamFilter.
//      map(rec => (rec.split(" ")(6).split("/")(2), 1))
//    val countByDepartment: DStream[(String, Int)] = departmentMap.
//      reduceByKey((agg, value) => agg + value)
//    countByDepartment.map(rec => rec).print()

ssc.start()
ssc.awaitTermination()

}
}

import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.flume._
object FlumeSparkDepartmentCount {

def main(args: Array[String]): Unit = {
val batchInterval = Seconds(30)
val sparkConf = new SparkConf().setAppName(“FlumePollingEventCount”).setMaster(“yarn-client”)
val ssc = new StreamingContext(sparkConf, batchInterval)
val host = "localhost"
val port = 18889

val stream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.
  createPollingStream(ssc, host, port)
//    stream.map(e => new String(e.event.getBody.array())).print
stream.map(e => new String(e.event.getBody.array())).
  filter(rec => rec.contains("GET /department/")).
  map(rec => (rec.split(" ")(6).split("/")(2), 1)).
  reduceByKey(_ + _).
  print()

//    val streamMap = stream.map(e => new String(e.event.getBody.array()))
//    val streamFilter = streamMap.
//      filter(rec => rec.contains("GET /department/"))
//    val departmentMap = streamFilter.
//      map(rec => (rec.split(" ")(6).split("/")(2), 1))
//    val countByDepartment: DStream[(String, Int)] = departmentMap.
//      reduceByKey((agg, value) => agg + value)
//    countByDepartment.map(rec => rec).print()

ssc.start()
ssc.awaitTermination()

}
}

You are not able to print the results to console.
Are you able to store the results to hdfs?

you mean print line is missing?

No, I am asking if you are able to write the results to hdfs instead of printing on console. Can you try that?

No I just only printed that. Well, I will do that it as well and update here. :slight_smile: