CLOUDERA VM : FileSystem EROOR : WRONG FS


#1
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.hadoop.fs._
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory

object Data {
  def main(args: Array[String]): Unit = {
    val prop = ConfigFactory.load()
    val conf = new SparkConf().setAppName("Hellow").setMaster(prop.getConfig(args(0)).getString("executionmode"))
    val sc = new SparkContext(conf)


    val input = "hdfs://quickstart.cloudera:8020" + args(1)
    val output = "hdfs://quickstart.cloudera:8020" + args(2)
    val fs = FileSystem.get(sc.hadoopConfiguration)
    if (!fs.exists(new Path(input))) {
      println("Input path Doesnot exists ")
    }
    else {
      if (fs.exists(new Path(output))) {
        fs.delete(new Path(output), true)
      }

      val data = sc.textFile(input).flatMap(x => x.split(" ")).map(x => (x, 1)).groupByKey().map(x => (x._1, x._2.sum)).sortBy(x => (-x._2))
      data.map(x => x._1 + "\t" + x._2).saveAsTextFile(output)
    }
  }
}

And in application.properties:
dev.executionmode=local
uat.executionmode=yarn-client

The Arguments I Passed
dev /user/cloudera/datawc /user/cloudera/dddddd

The ERROR is

Exception in thread “main” java.lang.IllegalArgumentException: Wrong FS: hdfs:/user/cloudera/datawc, expected: file:///
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:642)
at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:69)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:516)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:398)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1397)
at Data$.main(Data.scala:21)
at Data.main(Data.scala)

The fs doesnot work : Please Help Me


#2

hello Lokesh, were you able to resolve this issue ? Can you please tell me what the root cause is ?


#3

Are you also running into same issue?


#4

Yes. I am.

it was nice to see reply from you Durga. Let me take this opportunity to thank you for all that you are doing in big data space and for sharing your knowledge with others.

I am following you spark-scala videos that you conducted last year. Very basic code to check existence of hadoop directory, read few files , do transformations and save it another hadoop file as output.

If I give the input as /user/zoheb/data as directory–I get user error stating directory not found.
If I give the input as hdfs://localhost:9000/user/zoheb/data then I get error “Exception in thread “main” java.lang.IllegalArgumentException: Wrong FS”


#5

for your refernce here is a snippet

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import com.typesafe.config._
import org.apache.hadoop.fs.{FileSystem,Path}

object join_reduce {
  def main(args: Array[String]): Unit = {
    val props = ConfigFactory.load()

    val conf = new SparkConf().
    setMaster(props.getConfig(args(2)).getString("executionMode")).
    setAppName("Join_Reduce")
    val inputpathDir = args(0)
    val outputpath = args (1)

    val sc = new SparkContext(conf)
    val fs = FileSystem.get(sc.hadoopConfiguration)

    //check if input/output paths exists 
    val ip = new Path(inputpathDir)
    if(!fs.exists(ip)) {
      println("Base directory does not exist")
      return
    }
  val op = new Path(outputpath)
  if (fs.exists(op))
    fs.delete(op, true)

  // reading orders table
  val orders = sc.textFile(inputpathDir + "/orders/part-m-00000")

#6

This is not complete code.

Also share the details about how you are running the program and where you are running the program (Windows or Cloudera Quickstart VM)

Also share the error details you are getting. Don’t paste the entire log, just give us the error which point the issue.


#7

I am running the code in eclipse and on VM provided by edureka. Here is the complete code:

package FJR_package

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import com.typesafe.config._
import org.apache.hadoop.fs.{FileSystem,Path}
import org.apache.hadoop.conf.Configuration

object join_reduce {
	def main(args: Array[String]): Unit = {
			val props = ConfigFactory.load()

					//  sqoop import --connect jdbc:mysql://localhost/retail_db 
					//  --username root --password Edurekasql_123 --table orders 
					//  --m 1 --target-dir /user/edureka/data/orders
					//
					//  sqoop import --connect jdbc:mysql://localhost/retail_db 
					//  --username root --password Edurekasql_123 --table order_items 
					//  --m 1 --target-dir /user/edureka/data/order_items

					val conf = new SparkConf().
					setMaster(props.getConfig(args(2)).getString("executionMode")).
					setAppName("Join_Reduce")
					val inputpathDir = args(0)
					val outputpath = args (1)

					val sc = new SparkContext(conf)
					val fs = FileSystem.get(sc.hadoopConfiguration)

					//check if input/output paths exists 
					if(!fs.exists(new Path(inputpathDir))) {
						println("Base directory does not exist")
						return
					}
			val op = new Path(outputpath)
					if (fs.exists(op))
						fs.delete(op, true)

						// reading orders table
						val orders = sc.textFile(inputpathDir + "/orders/part-m-00000")

						// filtering completed/ closed order ids
						val orders_filter = orders.filter(rec => rec.split(",")(3) == "COMPLETE" || rec.split(",")(3) == "CLOSED")

						// selecting id and date
						val orders_date = orders_filter.map(rec => (rec.split(",")(0).toInt,rec.split(",")(1)))

						// reading orders_item table
						val order_items = sc.textFile(inputpathDir + "/order_items/part-m-00000")
						val orders_items_filter = order_items.map(rec => (rec.split(",")(1).toInt,rec.split(",")(4).toDouble))

						// joining orders and orders_items
						val join_orders = orders_date.join(orders_items_filter)

						// selecting date and value
						val join_orders_map = join_orders.map(rec => (rec._2._1,rec._2._2))

						val join_orders_mapGBK = join_orders_map.reduceByKey((agg,value) => agg + value)
						join_orders_mapGBK.saveAsTextFile(outputpath)


	}
}

build.sbt:

name := "_6FilterJoinRBK"
version := "1.0"
scalaVersion := "2.10.4"

libraryDependencies += "com.typesafe" % "config" % "1.3.2"
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "2.1.1"
libraryDependencies ++= Seq("org.apache.hadoop" % "hadoop-client" % "2.7.0")

application properties:

dev.executionMode = local
prod.executionMode = yarn-client

my input arguments:
hdfs://localhost:9000/user/edureka/data hdfs://localhost:9000/user/edureka/data/DailyRevenueResult dev

Here is the error that I get:
Exception in thread “main” java.lang.IllegalArgumentException: Wrong FS: hdfs://localhost:9000/user/edureka/data, expected: file:///
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:647)
at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:82)
at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:601)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:819)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:596)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1424)
at FJR_package.join_reduce$.main(join_reduce.scala:31)

i have verified core-site.xml. we have the desired properties there:


fs.default.name
hdfs://localhost:9000

hope this helps. please let me know if you need further info.


#8

I was able to fix the issue by adding this code. Had to explicitly mention the resources

sc.hadoopConfiguration.addResource(new Path("/usr/lib/hadoop-2.8.1/etc/hadoop/core-site.xml"));
val fs = FileSystem.get(new java.net.URI(“hdfs://localhost:9000”),sc.hadoopConfiguration)

I would however like to know why the configuration could not identify the fs.default.name on its own. Please let me know your thoughts.


#9

Is there a link for the VM provided by Edureka? It will help us setting up the environment and troubleshoot the issue.


#10

/user/zoheb/data is HDFS directory not directory from the local file system. You should give local file system details. By default typical Spark installation will identify local file system.


#11

I know this is an old thread but for anyone who is getting an issue with the FileSystem.get (sc.hadoopConfiguration) on intelliJ, I was able to solve it with the below tweak.

import org.apache.hadoop.fs.{Path}

val fs = org.apache.hadoop.fs.FileSystem.get(sc.hadoopConfiguration)