Spark-shell (Data Frame)


#1

Can anybody help me with below EXCEPTION occurse when I tried to write resultant data of the below program into HDFS:

var products = sc.textFile("/user/cloudera/problem2/products").map(x => {
var d = x.split(’|’);
(d(0).toInt, d(1).toInt, d(2).toString,d(3).toString, d(4).toFloat, d(5).toString)
})

case class Products(product_id: Integer, product_category_id: Integer, product_name: String, product_description: String, product_price: Float, product_image: String)

var dfResult = products.map(x=> Products(x._1, x._2, x._3, x._4,x._5,x._6)).toDF().filter(“product_price < 100”).groupBy(col(“product_category_id”)).agg(max(col(“product_price”)).alias(“max_price”),countDistinct(col(“product_id”)).alias(“total_products”),round(avg(col(“product_price”)),2).alias(“avg_price”),min(col(“product_price”)).alias(“min_price”)).orderBy(col(“product_category_id”))

dfResult.show //upto here code working very fine

//when I am trying to save resultant dataFrame to the given location. Below exception is throwing

scala> dfResult.write.avro("/user/cloudera/problem2/products/result-df")

It has written 56 files into directory and then failed. Why this happening and creates two hundred files into resultant directory?

[Stage 23:==================================> (34 + 2) / 56]

18/05/30 20:24:21 WARN hdfs.DFSClient:
Caught exception java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:952)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:690)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:879)

any help would be appreciated

Thanks