Spark streaming issues


#1

Hi ,

My producer will put messages in KAFKA broker in the format as below.

7,abc,accept,100
8,xyz,accept,100
9,lkj,accept,100
10,hjk,reject,100
10,a,accept,200
10,b,reject,300

when i consume this using my scala consumer code it is not working. The output files are blank. can you pls help and let me know what is the issue in my consumer scala code ?

package org.spark.learning

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._

import kafka.serializer.StringDecoder

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object kafkaconsumerstreaming {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName(“Streaming data RDBMS”).setMaster(“local[*]”)
val ssc = new StreamingContext(conf, Seconds(30))
val topicSet = Set(“mysqltopic2”)
val kafkaParams = Map[String, String](“metadata.broker.list” -> “localhost:9092”)
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParams, topicSet)

 val message = stream.map(s=>s._2) 
 val r1 = message.map(line => { val arr = line.split(","); (arr(2), arr(3).toInt); })
val r2 = r1.reduceByKey(_+_)
r2.saveAsTextFiles("file:///home/cloudera/res")
 ssc.start 
 ssc.awaitTermination

}
}

Thanks,
Suresh


#2

Hi Suresh,

Are you running this code in local or on lab ?

If you’re running on lab , below line code check it. It shoud be other local host

val kafkaParams = Map[String, String](“metadata.broker.list” -> “localhost:9092”).

Thanks


#3

Hi Santhosh,

I am running the code on cloudera VM and not on lab.

Thanks,
Suresh