Wordcount program throws an error when compiling locally

cloudera
scala
#1

Hi

I am trying to workout the wordcount progam exercise in my Windows development environment by creating a jar file and exporting to Cloudera VM. I am using IntelliJ as mentioned by Durga in his video.

YouTube video link:

Please see my program below.

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object wc {
def main(args: Array[String]): Unit = {

val conf = new SparkConf().setMaster("local").setAppName("word_count")
val sc = new SparkContext(conf)

val randomtext = sc.textFile("hdfs://192.168.56.101:8888/home/cloudera/Mani/data/word_count/wordcount_test.txt")
randomtext.flatMap(rec => rec.split(" ")).
  map(rec => (rec, 1)).
  reduceByKey((agg, value) => agg + value).
  collect().foreach(println)

}
}

I am getting the following error message when I try to run the program. Please can anyone advise?

Error:

Using Spark’s default log4j profile: org/apache/spark/log4j-defaults.properties
17/06/18 12:08:05 INFO SparkContext: Running Spark version 1.6.0
17/06/18 12:08:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable
17/06/18 12:08:06 INFO SecurityManager: Changing view acls to: Mani
17/06/18 12:08:06 INFO SecurityManager: Changing modify acls to: Mani
17/06/18 12:08:06 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Mani); users with modify permissions: Set(Mani)
17/06/18 12:08:07 INFO Utils: Successfully started service ‘sparkDriver’ on port 57819.
17/06/18 12:08:08 INFO Slf4jLogger: Slf4jLogger started
17/06/18 12:08:08 INFO Remoting: Starting remoting
17/06/18 12:08:08 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.56.1:57832]
17/06/18 12:08:08 INFO Utils: Successfully started service ‘sparkDriverActorSystem’ on port 57832.
17/06/18 12:08:08 INFO SparkEnv: Registering MapOutputTracker
17/06/18 12:08:08 INFO SparkEnv: Registering BlockManagerMaster
17/06/18 12:08:08 INFO DiskBlockManager: Created local directory at C:\Users\Mani\AppData\Local\Temp\blockmgr-33b4c0d4-9208-4e23-94a2-85aa057f5bb9
17/06/18 12:08:08 INFO MemoryStore: MemoryStore started with capacity 2.4 GB
17/06/18 12:08:08 INFO SparkEnv: Registering OutputCommitCoordinator
17/06/18 12:08:09 WARN Utils: Service ‘SparkUI’ could not bind on port 4040. Attempting port 4041.
17/06/18 12:08:09 WARN Utils: Service ‘SparkUI’ could not bind on port 4041. Attempting port 4042.
17/06/18 12:08:09 INFO Utils: Successfully started service ‘SparkUI’ on port 4042.
17/06/18 12:08:09 INFO SparkUI: Started SparkUI at http://192.168.56.1:4042
17/06/18 12:08:09 INFO Executor: Starting executor ID driver on host localhost
17/06/18 12:08:09 INFO Utils: Successfully started service ‘org.apache.spark.network.netty.NettyBlockTransferService’ on port 57839.
17/06/18 12:08:09 INFO NettyBlockTransferService: Server created on 57839
17/06/18 12:08:09 INFO BlockManagerMaster: Trying to register BlockManager
17/06/18 12:08:09 INFO BlockManagerMasterEndpoint: Registering block manager localhost:57839 with 2.4 GB RAM, BlockManagerId(driver, localhost, 57839)
17/06/18 12:08:09 INFO BlockManagerMaster: Registered BlockManager
17/06/18 12:08:10 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 107.7 KB, free 107.7 KB)
17/06/18 12:08:10 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 9.8 KB, free 117.5 KB)
17/06/18 12:08:10 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:57839 (size: 9.8 KB, free: 2.4 GB)
17/06/18 12:08:10 INFO SparkContext: Created broadcast 0 from textFile at wc.scala:14
17/06/18 12:08:10 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:278)
at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:300)
at org.apache.hadoop.util.Shell.(Shell.java:293)
at org.apache.hadoop.util.StringUtils.(StringUtils.java:76)
at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:362)
at org.apache.spark.SparkContext$$anonfun$hadoopFile$1$$anonfun$33.apply(SparkContext.scala:1015)
at org.apache.spark.SparkContext$$anonfun$hadoopFile$1$$anonfun$33.apply(SparkContext.scala:1015)
at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:176)
at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:176)
at scala.Option.map(Option.scala:145)
at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:176)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:195)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:65)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$reduceByKey$3.apply(PairRDDFunctions.scala:331)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$reduceByKey$3.apply(PairRDDFunctions.scala:331)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.PairRDDFunctions.reduceByKey(PairRDDFunctions.scala:330)
at wc$.main(wc.scala:17)
at wc.main(wc.scala)
Exception in thread “main” java.io.IOException: Failed on local exception: com.google.protobuf.InvalidProtocolBufferException: Protocol message tag had invalid wire type.; Host Details : local host is: “LAPTOP-KS8RRVDO/192.168.56.1”; destination host is: “quickstart.cloudera”:8888;
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:764)
at org.apache.hadoop.ipc.Client.call(Client.java:1351)
at org.apache.hadoop.ipc.Client.call(Client.java:1300)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
at com.sun.proxy.$Proxy18.getFileInfo(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy18.getFileInfo(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:651)
at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1679)
at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1106)
at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1102)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1102)
at org.apache.hadoop.fs.FileSystem.globStatusInternal(FileSystem.java:1701)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1647)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:222)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:65)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$reduceByKey$3.apply(PairRDDFunctions.scala:331)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$reduceByKey$3.apply(PairRDDFunctions.scala:331)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.PairRDDFunctions.reduceByKey(PairRDDFunctions.scala:330)
at wc$.main(wc.scala:17)
at wc.main(wc.scala)
Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol message tag had invalid wire type.
at com.google.protobuf.InvalidProtocolBufferException.invalidWireType(InvalidProtocolBufferException.java:99)
at com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:498)
at com.google.protobuf.UnknownFieldSet$Builder.mergeFrom(UnknownFieldSet.java:461)
at com.google.protobuf.UnknownFieldSet$Builder.mergeFrom(UnknownFieldSet.java:579)
at com.google.protobuf.UnknownFieldSet$Builder.mergeFrom(UnknownFieldSet.java:280)
at com.google.protobuf.CodedInputStream.readGroup(CodedInputStream.java:240)
at com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:488)
at com.google.protobuf.GeneratedMessage.parseUnknownField(GeneratedMessage.java:193)
at org.apache.hadoop.ipc.protobuf.RpcHeaderProtos$RpcResponseHeaderProto.(RpcHeaderProtos.java:1404)
at org.apache.hadoop.ipc.protobuf.RpcHeaderProtos$RpcResponseHeaderProto.(RpcHeaderProtos.java:1362)
at org.apache.hadoop.ipc.protobuf.RpcHeaderProtos$RpcResponseHeaderProto$1.parsePartialFrom(RpcHeaderProtos.java:1492)
at org.apache.hadoop.ipc.protobuf.RpcHeaderProtos$RpcResponseHeaderProto$1.parsePartialFrom(RpcHeaderProtos.java:1487)
at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:200)
at com.google.protobuf.AbstractParser.parsePartialDelimitedFrom(AbstractParser.java:241)
at com.google.protobuf.AbstractParser.parseDelimitedFrom(AbstractParser.java:253)
at com.google.protobuf.AbstractParser.parseDelimitedFrom(AbstractParser.java:259)
at com.google.protobuf.AbstractParser.parseDelimitedFrom(AbstractParser.java:49)
at org.apache.hadoop.ipc.protobuf.RpcHeaderProtos$RpcResponseHeaderProto.parseDelimitedFrom(RpcHeaderProtos.java:2364)
at org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:996)
at org.apache.hadoop.ipc.Client$Connection.run(Client.java:891)
17/06/18 12:08:16 INFO SparkContext: Invoking stop() from shutdown hook
17/06/18 12:08:16 INFO SparkUI: Stopped Spark web UI at http://192.168.56.1:4042
17/06/18 12:08:16 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
17/06/18 12:08:16 INFO MemoryStore: MemoryStore cleared
17/06/18 12:08:16 INFO BlockManager: BlockManager stopped
17/06/18 12:08:16 INFO BlockManagerMaster: BlockManagerMaster stopped
17/06/18 12:08:16 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
17/06/18 12:08:16 INFO SparkContext: Successfully stopped SparkContext
17/06/18 12:08:16 INFO ShutdownHookManager: Shutdown hook called
17/06/18 12:08:16 INFO ShutdownHookManager: Deleting directory C:\Users\Mani\AppData\Local\Temp\spark-881265a7-934e-47d2-96d7-36ebb7638a3f
17/06/18 12:08:16 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.

Process finished with exit code 1

0 Likes

#2

@manikandanra You get this error because your program is not able to find HADOOP_HOME directory.

  1. First download winutils.exe from the below link
    http://public-repo-1.hortonworks.com/hdp-win-alpha/winutils.exe
  2. create folder like c:\winutils\bin
  3. copy winutils.exe into the created folder c:\winutils\bin
  4. create HADOOP_HOME as system environmental variable in your system with the path of winutils or use the below statement in your code in the ‘main’ code block.

System.setProperty("hadoop.home.dir", "C:\\winutils")

1 Like

#3

Is this applicable for Cloudera as I did Cloudera VM setup in machine not HortonWorks.

Another doubt is, I am running the wordcount program using IntelliJ installed in local Windows environment but the input file is located remotely in Cloudera VM. That’s why I have given the Cloudera VM IP address in the path. Is this correct way of doing this or I need to install Spark locally?

0 Likes

#4

@manikandanra If you build a jar and run in the Cloudera VM then you need not set the property, as the VM will already have HADOOP_HOME set in it.
But if you want to run the program in Windows then you need to add this property.
For your question on access the file in VM from your windows, I don’t think it’s possible. You cannot access or run the program in windows for the file which does not exist in your local system. That is the reason we build a jar and then copy it to the cluster (VM). we use Windows only for unit testing and application development.

0 Likes

#5

Thanks very much Ashok. The problem is solved.

0 Likes

#6

Thanks, Ashok!
Issue resolved.

0 Likes