Spark streaming: wordcount problems both through spark-shell and spark-submit

spark-streaming

#1

Hi all,
I am using cloudera VM and trying to run the videos 16 and 17 on this page : http://www.itversity.com/topic/cca175-spark-streaming-getting-started-scala/ to do the wordcount with streaming input.

When I run the program through the regular spark-shell, after ssc.start I get following warning :

“WARN streaming.StreamingContext: Dynamic Allocation is enabled for this application. Enabling Dynamic allocation for Spark Streaming applications can cause data loss if Write Ahead Log is not enabled for non-replayable sources like Flume. … WARN cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources .”

When I give some input through the port 9999 as described in the video I get following error :
"Error sending message [message = RemoveBroadcast(1,true)] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 120 seconds. This timeout is controlled by spark.rpc.askTimeout . "

Also when running the job using the spark-submit (video 17), I get this error. I do not see the result of the count of each word (Hello, 4) etc. On the screen.

My interpretation is that somehow there are no resources available to process the data that is coming in, that after 120 seconds he gives a timeout.

Do you have any suggestions on how to fix this?

Many thanks for any feedback!


#2

I was also facing the same issue with cloudera quickstart VM and it got resolved by referring https://stackoverflow.com/questions/36577293/spark-streaming-kafkawordcount-cannot-run-on-a-spark-standalone-cluster

I hope that below steps will be useful to resolve above mentioned issue:

Step-1: Start netcat service

asus@asus-GL553VD:~$ ssh cloudera@192.168.211.142
cloudera@192.168.211.142's password: 
Last login: Sun May 20 20:58:04 2018 from 192.168.211.1
[cloudera@quickstart ~]$ nc -lk quickstart.cloudera 9595
this is the first message
this is the second message
this is the third message
this is the first message
this is the second message
this is the third message
this is the first message
this is the second message
this is the third message
this is the first message
this is the second message
this is the third message

Step-2 : Launch Spark Shell & verify word count output

asus@asus-GL553VD:~$ ssh cloudera@192.168.211.142
cloudera@192.168.211.142's password: 
Last login: Sun May 20 21:40:04 2018 from 192.168.211.1

[cloudera@quickstart ~]$ spark-shell --master yarn --conf spark.rpc.netty.dispatcher.numThreads=2 
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/flume-ng/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/parquet/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/avro/avro-tools-1.7.6-cdh5.12.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.0
      /_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_67)
Type in expressions to have them evaluated.
Type :help for more information.
18/05/20 21:47:37 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/05/20 21:47:37 WARN util.Utils: Your hostname, quickstart.cloudera resolves to a loopback address: 127.0.0.1; using 192.168.211.142 instead (on interface eth1)
18/05/20 21:47:37 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
18/05/20 21:47:41 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
Spark context available as sc (master = yarn-client, app id = application_1525607376030_0038).
SQL context available as sqlContext.

scala> import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.Seconds

scala> import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext

scala> val ssc = new StreamingContext(sc, Seconds(10))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@41fb0731

scala> val lines = ssc.socketTextStream("quickstart.cloudera", 9595)
lines: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] = org.apache.spark.streaming.dstream.SocketInputDStream@53ec9705

scala> val words = lines.flatMap((line: String) => line.split(" "))
words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@7fd7bc7d

scala> val wordsMap = words.map((word: String) => (word, 1))
wordsMap: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@57b6805a

scala> val wordCount = wordsMap.reduceByKey((agg, ele) => agg + ele)
wordCount: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@33edad01

scala> wordCount.print

scala> ssc.start
                                                                                
scala> -------------------------------------------
Time: 1526878150000 ms
-------------------------------------------
(this,3)
(is,3)
(second,1)
(first,1)
(message,3)
(third,1)
(the,3)

-------------------------------------------
Time: 1526878160000 ms
-------------------------------------------

-------------------------------------------
Time: 1526878170000 ms
-------------------------------------------
(this,6)
(is,6)
(second,2)
(first,2)
(message,6)
(third,2)
(the,6)

-------------------------------------------
Time: 1526878180000 ms
-------------------------------------------
(this,3)
(is,3)
(second,1)
(first,1)
(message,3)
(third,1)
(the,3)

-------------------------------------------
Time: 1526878190000 ms
-------------------------------------------

-------------------------------------------
Time: 1526878200000 ms
-------------------------------------------

-------------------------------------------
Time: 1526878210000 ms
-------------------------------------------

-------------------------------------------
Time: 1526878220000 ms
-------------------------------------------