How to fetch hashtag and text from tweets which is stored in kafka topic using spark api in scala

How to fetch hashtag and text from tweets which is stored in kafka topic using spark api in scala
0.0 0


I am working on Hortonworks.I have stored tweets from Twitter to Kafka topic using flume.I am performing sentiment analysis on tweets using Kafka as a Producer and Spark as a Consumer using Scala on Spark-shell. But I want to fetch only specific content from tweets like Text, HashTag, Sentiment Analysis result (tweets is positive or negative), words from the tweets which I have selected as a positive or negative word. My training data is Data.txt.

Data.txt contains data as below:

like positive
doom negative
doomed negative
doubt positive

I added dependencies : org.apache.spark:spark-streaming-kafka_2.10:1.6.2,org.apache.spark:spark-streaming_2.10:1.6.2

Here is my code:

import org.apache.spark._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds 
import org.apache.spark.streaming.kafka._
val conf = new SparkConf().setMaster("local[4]").setAppName("KafkaReceiver")
val ssc = new StreamingContext(conf, Seconds(5))
val zkQuorum=""
val group="test-consumer-group"
val topics="test"
val numThreads=5
val args=Array(zkQuorum, group, topics, numThreads)
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val hashTags = lines.flatMap(_.split(" ")).filter(_.startsWith("#"))
val wordSentimentFilePath = "hdfs://"
val wordSentiments = ssc.sparkContext.textFile(wordSentimentFilePath).map { line =>
      val Array(word, happiness) = line.split("\t")
      (word, happiness)
    } cache()
    val happiest60 = => (hashTag.tail, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)).transform{topicCount => wordSentiments.join(topicCount)}.map{case (topic, tuple) => (topic, tuple._1 * tuple._2)}.map{case (topic, happinessValue) => (happinessValue, topic)}.transform(_.sortByKey(false))

using above code I got the output like this,

(negative, fear) (positive, fitness)

But I want output like this,

(#sports,Text from the Tweets,fitness,positive)

I have no idea about how to fetch HashTag and Text from tweet to get the output like above.


A sample to code extract hashtags from streaming text:

stream.flatMap(text => text.split(" “).filter(_.startsWith(”#")))


I want to fetch the Text from the tweet stored in kafka topic…

Following is tweet stored in Kafka Topic:

StatusJSONImpl{createdAt=Tue Oct 10 13:59:25 UTC 2017, id=917751425102467072, text=’#OMG#AnotherOneBitesTheDust#Dhawan is gone to a brillaint catch by #Warner#ComeOn #INDIA!! post a totalat least!#INDvsAUS #INDvsAUSt20’, source=‘Twitter Web Client’, isTruncated=true, inReplyToStatusId=-1, inReplyToUserId=-1, isFavorited=false, isRetweeted=false, favoriteCount=0, inReplyToScreenName=‘null’, geoLocation=null, place=null, retweetCount=0, isPossiblySensitive=false, lang=‘en’, contributorsIDs=[], retweetedStatus=null, userMentionEntities=[], urlEntities=[], hashtagEntities=[HashtagEntityJSONImpl{text=‘OMG’}, HashtagEntityJSONImpl{text=‘AnotherOneBitesTheDust’}, HashtagEntityJSONImpl{text=‘Dhawan’}, HashtagEntityJSONImpl{text=‘Warner’}, HashtagEntityJSONImpl{text=‘ComeOn’}, HashtagEntityJSONImpl{text=‘INDIA’}

I want to fetch only text as below:

text=’#OMG #AnotherOneBitesTheDust#Dhawan is gone to a brillaint catch by #Warner#ComeOn #INDIA!! post a total at least!


The output of that function should be text which has hashtags only. So yo can store them or do further processing.