Facing issue while running twitter spark streaming application

Hi Team,

I tried running following code but getting error all teh time as follows:

java.lang.ClassNotFoundException: TwitterPopularTags
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:175)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:689)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

code:

package org.apache.spark.streaming.examples

import org.apache.spark.streaming.{Seconds, StreamingContext}
import StreamingContext._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel

/**

  • Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter
  • stream. The stream is instantiated with credentials and optionally filters supplied by the
  • command line arguments.

*/
object TwitterPopularTags {
def main(args: Array[String]) {
if (args.length < 1) {
System.err.println(“Usage: TwitterPopularTags " +
” ")
System.exit(1)
}

// StreamingExamples.setStreamingLogLevels()

val Array(consumerKey, consumerSecret, accessToken , accessTokenSecret) = args.take(4)

val filters = args.takeRight(args.length - 4)


/** set the system properties */

System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)


val sparkConf = new SparkConf().setAppName("TwitterPopularTags").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val stream = TwitterUtils.createStream(ssc, None, filters)
val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))

val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
                 .map{case (topic, count) => (count, topic)}
                 .transform(_.sortByKey(false))

val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10))
                 .map{case (topic, count) => (count, topic)}
                 .transform(_.sortByKey(false))



// Print popular hashtags
topCounts60.foreachRDD(rdd => {
  val topList = rdd.take(5)
  println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
  topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})

topCounts10.foreachRDD(rdd => {
  val topList = rdd.take(5)
  println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
  topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})

ssc.start()
ssc.awaitTermination()

}
}

sbt file:

lazy val root = (project in file(".")).
settings(
name := “TwitterStreamProject”,
version := “1.0”,
scalaVersion := “2.11.7”,
mainClass in Compile := Some(“myPackage.MyMainObject”)
)

libraryDependencies ++= Seq(
“org.apache.spark” %% “spark-core” % “1.2.0” % “provided”,
“org.apache.spark” %% “spark-streaming” % “1.2.0” % “provided”,
“org.apache.spark” % “spark-streaming-twitter_2.10” % “1.2.0”
)

mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
{
case PathList(“META-INF”, xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
}

assembly.sbt:

resolvers += Resolver.url(“sbt-plugin-releases-scalasbt”, url(“http://repo.scala-sbt.org/scalasbt/sbt-plugin-releases/”))

addSbtPlugin(“com.eed3si9n” % “sbt-assembly” % “0.12.0”)

Atul-Macbook:bin atul595525$ ./spark-submit \

–class “TwitterPopularTags”
–master local[4]
/Users/atul595525/Desktop/sparkdemo/twitterstreamdemo/target/scala-2.11/TwitterStreamProject-assembly-1.0.jar

Can you please help.

Thanks,
Atul