Exercise 20 - Streaming analytics using Flume, Kafka and Spark

Resources:

  • Click here for $35 coupon for CCA 175 Spark and Hadoop Developer using Python.
  • Click here for $35 coupon for CCA 175 Spark and Hadoop Developer using Scala.
  • Click here for $25 coupon for HDPCD:Spark using Python.
  • Click here for $25 coupon for HDPCD:Spark using Scala.
  • Click here for access to state of the art 13 node Hadoop and Spark Cluster

Description

  • To integrate Flume, Kafka and Spark and perform analysis on the data in streaming fashion.

Problem Statement

  • Get streaming data from /opt/gen_logs/logs/access.log and perform these
    • Store raw data in HDFS
    • Analysis have to be done to get number of events for each department
    • Store aggregated data in HDFS. Aggregation should happen by department every 2 seconds.
    • Store both windowing as well as non windowing aggregation

For samples use these URLs

@itversity
In above exercise, while saving widowing and non windowing aggregation in hdfs, it is creating multiple separate directories for each window and batch Interval aggregation in hdfs rather than saving all files in same directory. My Spark Streaming code is below.

/**

  • Created by Chakote on 24-03-2017.
    */
    import kafka.serializer.StringDecoder
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.kafka._
    import org.apache.spark.SparkConf

object SparkStreamingExcercise {

def main(args: Array[String]): Unit = {

val batchInterval = Seconds(2)
val sparkConf = new SparkConf().setAppName("DepartmentWiseAnalysisExce").setMaster("yarn-client")
val topicSet = "NikExce".split(",").toSet
val kafkaParam = Map[String,String]("metadata.broker.list" -> "nn02.itversity.com:6667")
val ssc = new StreamingContext(sparkConf,batchInterval)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParam, topicSet)
val lines = messages.map(_._2)
val linesFiltered = lines.filter(rec => rec.contains("GET /department/"))
val linesMap = linesFiltered.map(rec => (rec.split(" ")(6).split("/")(2), 1))
val countByDepartments = linesMap.reduceByKey(_ + _)
countByDepartments.saveAsTextFiles("/user/nikkhiel123/streamingExerciseDirect","txt")
//countByDepartments.print()
val countByDepartmentsWindow = linesMap.reduceByKeyAndWindow((a:Int, b:Int) => (a+b), Seconds(60), Seconds(10))
countByDepartmentsWindow.print()
countByDepartmentsWindow.saveAsTextFiles("/user/nikkhiel123/streamingExerciseWindow","txt")
ssc.start()
ssc.awaitTermination()

}

}

Multiple directories generated as below,

drwxr-xr-x - nikkhiel123 hdfs 0 2017-03-25 01:37 /user/nikkhiel123/streamingExerciseDirect-1490420272000.txt
drwxr-xr-x - nikkhiel123 hdfs 0 2017-03-25 01:37 /user/nikkhiel123/streamingExerciseDirect-1490420274000.txt
drwxr-xr-x - nikkhiel123 hdfs 0 2017-03-25 01:37 /user/nikkhiel123/streamingExerciseDirect-1490420276000.txt
drwxr-xr-x - nikkhiel123 hdfs 0 2017-03-25 01:30 /user/nikkhiel123/streamingExerciseWindow
drwxr-xr-x - nikkhiel123 hdfs 0 2017-03-25 01:36 /user/nikkhiel123/streamingExerciseWindow-1490420184000.txt
drwxr-xr-x - nikkhiel123 hdfs 0 2017-03-25 01:36 /user/nikkhiel123/streamingExerciseWindow-1490420194000.txt
drwxr-xr-x - nikkhiel123 hdfs 0 2017-03-25 01:36 /user/nikkhiel123/streamingExerciseWindow-1490420204000.txt

How can we put all generated results in one single folder.

Thank you.

Periodically you can run simple program which will merge these files.

Thank you sir,
So you mean after running these jobs files will merge automatically or we need to write seperate program to make these files to merge.

You have to write a simple program. It can be as simple as this

sc.textFile("/user/nikkhiel123/streamingExerciseDirect*/*.txt").coalesce(1).saveAsTextFile(TARGET_PATH) and then remove the directories.

Thank you sir I will use this.