Cannot assign requested address


#1

While i m trying integrate flume with spark streaming , i m facing the below error. i tried with different ports, but no luck.

flume agent is started with out issue and streaming program is also compiled successfully.

when i run spark-submit command as below , getting error
Caused by: java.net.BindException: Cannot assign requested address
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:433)
at sun.nio.ch.Net.bind(Net.java:425)
at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
at org.jboss.netty.channel.socket.nio.NioServerBoss$RegisterTask.run(NioServerBoss.java:193)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:372)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:296)
at org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42)
… 3 more

################## Flume - Spark Integration ####################################

##Name the components on this agent
wk.sources = ws
wk.sinks = hd spark
wk.channels = hdmem sparkmem

#describe the Sources
wk.sources.ws.type = exec
wk.sources.ws.command = tail -F /opt/gen_logs/logs/access.log

#describe the sink
wk.sinks.hd.type = hdfs
wk.sinks.hd.hdfs.path = hdfs://nn01.itversity.com:8020/user/kirantadisetti/flume_demo
wk.sinks.hd.hdfs.filePrefix = Flumedemo
wk.sinks.hd.hdfs.fileSuffix = .txt
wk.sinks.hd.hdfs.rollInterval = 120
wk.sinks.hd.hdfs.rollSize = 1048576
wk.sinks.hd.hdfs.fileType = DataStream

wk.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
wk.sinks.spark.hostname = gw02.itversity.com
wk.sinks.spark.port = 9783

#describe the channel
wk.channels.hdmem.type = memory
wk.channels.hdmem.capacity = 1000
wk.channels.hdmem.transactionCapacity = 100

wk.channels.sparkmem.type = memory
wk.channels.sparkmem.capacity = 1000
wk.channels.sparkmem.transactionCapacity = 100

#bind the sources and sinks to the channel
wk.sources.ws.channels = hdmem sparkmem
wk.sinks.hd.channel = hdmem
wk.sinks.spark.channel = sparkmem

start the agent

flume-ng agent -n wk -f flume_spark.conf

update the build.sbt file with the following dependecies.

name := “myproject”
scalaVersion := “2.10.6”
publishMavenStyle := false
libraryDependencies += “org.apache.spark” % “spark-core_2.10” % “1.6.3”
libraryDependencies += “org.apache.spark” % “spark-streaming_2.10” % “1.6.3”
libraryDependencies += “org.apache.spark” % “spark-streaming-flume_2.10” % “1.6.3”
libraryDependencies += “org.apache.spark” % “spark-streaming-flume-sink_2.10” % “1.6.3”
libraryDependencies += “org.scala-lang” % “scala-library” % “2.10.5”
libraryDependencies += “org.apache.commons” % “commons-lang3” % “3.3.2”

sbt package

write the spark program to use the flume APIs in the spark streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{StreamingContext,Seconds}
import org.apache.spark.streaming.flume._

object FlumeStreamingWordCount {

def main( args : Array[String] ) {

 val conf = new SparkConf().
                setAppName("Flume Streaming Word Count").
                setMaster(args(0))

 val ssc = new StreamingContext( conf , Seconds(30) )

 val flumeStream = FlumeUtils.createStream(ssc , args(1) , args(2).toInt )
 val messages = flumeStream.map ( s => new String(s.event.getBody.array()))  ## convert flume events to string format

 val departmentMessages =  messages.filter ( msg => {
         val endpoint = msg.split(" ")(6)
         endpoint.split("/")(1) == "department" }
                                            )

val fianldeptmesgs =  departmentMessages.map( msg => {
         val endpoint = msg.split(" ")(6)
         ( endpoint.split("/")(2) , 1 ) }
                                            )

val DepartmentWiseCount = fianldeptmesgs.reduceByKey ( _ + _ )

DepartmentWiseCount.saveAsTextFiles("/user/kirantadisetti/kafka-spark/outputs/deparmentwordcount")

ssc.start()
ssc.awaitTermination()

}
}

make sure dependency jar files exist in the below locations ( see --jars )

spark-submit
–class FlumeStreamingWordCount
–packages org.apache.spark:spark-streaming-flume_2.10:1.6.2
–conf spark.ui.port=24534
–master yarn
–jars “/usr/hdp/2.5.0.0-1245/spark/lib/spark-streaming-flume-sink_2.10-1.6.2.jar,
/usr/hdp/2.5.0.0-1245/spark/lib/spark-streaming-flume_2.10-1.6.2.jar ,
/usr/hdp/2.5.0.0-1245/flume/lib/commons-lang3-3.5.jar ,
/usr/hdp/2.5.0.0-1245/spark/lib/spark-streaming_2.10-1.6.2.jar”
myproject_2.10-0.1-SNAPSHOT.jar yarn-client 172.16.1.109 9783


#2

Please update on the theread and let me know how to resolve this ???


#3

Hi Kiran

After investigating the issue, we found couple of issues

  1. In FlumeStreamingWordCount.scala FlumeUtils.createPollingStream() should be used instead of FlumeUtils.createStream(). FlumeUtils.createPollingStream() is used in Pull-based approach while createStream() is used for Push-based approach. So basically in this scenario, we are pulling the messages which Flume sinked in.
    val stream = FlumeUtils.createPollingStream(ssc, args(1), args(2).toInt)

  2. Run spark-submit using the following jar files in right folders

    spark-submit
    –class FlumeStreamingWordCount
    –master yarn
    –conf spark.ui.port=12986
    –jars “/usr/hdp/current/flume-server/lib/spark-streaming-flume-sink_2.10-1.6.3.jar,/usr/hdp/current/flume-server/lib/spark-streaming-flume_2.10-1.6.3.jar,/usr/hdp/current/flume-server/lib/commons-lang3-3.5.jar,/usr/hdp/current/flume-server/lib/flume-ng-sdk-1.5.2.2.6.5.0-292.jar”
    myproject_2.10-1.0.jar yarn-client gw02.itversity.com 8008

<myproject_2.10-1.0.jar> will be the new jar file after compilation. 8008 port has been updated in flume conf file (It could be 9783 also as specified before)

After making the changes, recompile the program using Sbt and run. It will work.