Exercise 24 - Spark streaming word count

sbt
eclipse
spark-streaming
apache-spark
scala
#1

Problem Statement

  • Build a network wordcount program using spark streaming
  • Make sure you run netcat eg: nc -lk 9999
  • Submit the jar file with wordcount program
  • Type some thing on nc and make sure word count is done as part of the job

Please provide the following

  • Word count program code
  • Spark submit command
  • Sample word count data from the logs
1 Like

#2

#Word count program code

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import StreamingContext._
import org.apache.hadoop.conf._
import org.apache.hadoop.fs._

object Streaming {

  def main(args: Array[String]) {

    val sparkConf = new SparkConf().setAppName("HdfsWordCount").setMaster("local[4]")

    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create the FileInputDStream on the directory and use the
    // stream to count words in new files created
    val lines = ssc.socketTextStream("localhost", 1234)

    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }

}

#Spark submit command

spark-submit --class "Streaming" scala-spark-training_2.10-1.0.jar

###Netcat input

[cloudera@quickstart ~]$ nc -lk 1234
Hi my name is Farhan Misarwala
I am learning big data 
Hi my name is Farhan Misarwala
I am learning big data

#Sample word count data from the logs

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
16/12/22 14:43:16 INFO Remoting: Starting remoting
16/12/22 14:43:16 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.0.2.15:38483]
16/12/22 14:43:16 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@10.0.2.15:38483]


-------------------------------------------
Time: 1482398014000 ms
-------------------------------------------
(my,1)
(Hi,1)
(is,1)
(Farhan,1)
(name,1)
(Misarwala,1)


-------------------------------------------
Time: 1482398018000 ms
-------------------------------------------

-------------------------------------------
Time: 1482398020000 ms
-------------------------------------------
(am,1)
(big,1)
(I,1)
(learning,1)
(data,1)

-------------------------------------------
Time: 1482398022000 ms
-------------------------------------------

-------------------------------------------
Time: 1482398024000 ms
-------------------------------------------
0 Likes

#3
sc.stop()
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(30))
val lines = ssc.socketTextStream("localhost", 8899)
val words = lines.flatMap(_.split(" ")) // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()             // Start the computation
ssc.awaitTermination()

Time: 1482398610000 ms

(further,1)
(are,1)
(wordCounts.print(),1)
(call,1)
((word,1)
(after,1)
(is,3)
(few,1)
(counts,1)
(transformation),1)
…

0 Likes

#4

Streaming Code

package main.scala


import com.typesafe.config._
import org.apache.spark.SparkContext, org.apache.spark.SparkConf
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.spark._
import org.apache.spark.streaming._

object SparkStreaming {
  def main(args: Array[String]) {
    val appConf=ConfigFactory.load()
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount_Raja")
    val ssc = new StreamingContext(conf, Seconds(20))
    val lines = ssc.socketTextStream("localhost", 19999)
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

Netcat i/p:

flatMap is a one-to-many DStream operation that creates a new DStream by generating multiple new records from each record in the source DStream. In this case, each line will be split into multiple words and the stream of words is represented as the words DStream. Next, we want to count these words.

Spark-submit:

spark-submit --class main.scala.SparkStreaming sparkstreaming_2.11-1.0.jar

o/p:


Time: 1482398540000 ms

(stream,1)
(records,1)
(this,1)
(case,1)
(is,2)
(split,1)
(line,1)
(generating,1)
(DStream.,2)
(will,1)

0 Likes

#5

Word count program code

package com.scala.avgrvn

import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds

object SparkStreamingWC {
def main(args: Array[String])
{
val conf = new SparkConf().setMaster(“local[2]”).setAppName(“StreamWordCount”)
val stcn = new StreamingContext(conf, Seconds(30))

val lines = stcn.socketTextStream("localhost", 12345)

val WordCount = lines.flatMap(rec => rec.split(" ")).map(rec => (rec, 1)).reduceByKey(_+_)

WordCount.print()

stcn.start() 
stcn.awaitTermination()

}

}

Spark submit command

spark-submit --class “com.scala.avgrvn.SparkStreamingWC”
scalaMvn-0.0.1-SNAPSHOT.jar

Sample word count data from the logs

Time: 1482398670000 ms

(is,1)
(there,1)
(boy,1)
(a,1)
(good,1)

0 Likes

#6

import org.apache.spark._
import org.apache.spark.streaming._

object sparkStreaming {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster(“local[2]”).setAppName(“NetworkWordCount”)
val ssc = new StreamingContext(conf, Seconds(30))
val lines = ssc.socketTextStream(“localhost”, 9992)

val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()

ssc.start()
ssc.awaitTermination()

}
}


spark-submit --class “sparkStreaming” sparkstreaming_2.10-1.0.jar


Time: 1482399240000 ms

(free,2)
(broadcast_4,1)
((estimated,2)
(2.6,1)
(KB),2)
(INFO,1)
(MemoryStore:,1)
(as,2)
(16/12/22,1)
(values,1)
…

0 Likes

#7

Word count program code

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import StreamingContext._

object SparkStreamWC {

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("Spark Streaming Word Count").setMaster("local[4]")
    val ssc = new StreamingContext(conf, Seconds(30))
    val lines = ssc.socketTextStream("localhost", 54321)
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

Spark submit command

spark-submit --class "sparkstream.spark.scala.SparkStreamWC" retailanother-0.0.1-SNAPSHOT.jar

Sample word count data from the logs

(further,1)
(are,1)
(after,1)
(few,1)
(counts,1)
(transformation),1)
(have,1)
(only,1)
(been,1)
(frequency,1)

(further,4)
(are,4)
(after,4)
(few,4)
(counts,4)
(transformation),4)
(have,4)
(only,4)
(,5)
(frequency,4)

0 Likes

#8

Word count program code:

import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import StreamingContext._
import org.apache.hadoop.conf._
import org.apache.hadoop.fs._

object StreamingWC {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster(“local”).setAppName(“StreamingWordCount”)
val ssc = new StreamingContext(conf, Seconds(30))
val lines = ssc.socketTextStream(“localhost”, 6666)
val words = lines.flatMap(.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(
+ _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}

Spark submit command:

spark-submit --class “java.StreamingWC” DataFrames-0.0.1-SNAPSHOT.jar

Sample word count data from the logs:

(Come,1)
(Defects,1)
(In, 1)
(Information,1)
(Jan,1)
(Plain,2)
(Project,5)

0 Likes

#9

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

object sparkstreaming {
def main(args:Array[String]){
val conf = new SparkConf().setMaster(“local[2]”).setAppName(“NetworkWordCount”)
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream(“localhost”, 9999)

// Split each line into words
val words = lines.flatMap(_.split(" "))

val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination()
}
}

nc -lk 9999
hello world

spark-submit --class sparkstreaming dataframes-0.0.1-SNAPSHOT.jar


Time: 1482407364000 ms

(hello,1)
(world,1)

0 Likes

#10

#Word count program code

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkConf
import com.typesafe.config.ConfigFactory

object SparkExercise24 {
def main(args: Array[String]): Unit = {
val appConf = ConfigFactory.load()
val conf = new SparkConf().setAppName(“Sneh Exercise 24”).setMaster(appConf.getConfig(args(0)).getString(“deployment-streaming”))

val ssc = new StreamingContext(conf, Seconds(args(1).toInt))
val lines = ssc.socketTextStream("localhost", args(2).toInt)
val words = lines.flatMap(_.split(" "))

val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

wordCounts.print()
ssc.start()
ssc.awaitTermination()

}
}


#Spark submit command

Terminal 1:
spark-submit --class SparkExercise24 prod 9002 30

Terminal 2:
nc -lk 9002

0 Likes