Unable to Save data in Avro format


#1

Hi,

I am trying to save the data in avro format in spark shell after creating a dataframe. Below are the steps i m performing.

dataframe.save(“path”,“avro”) --did not work
dataframe.write.avro(“path”) --worked

spark-shell --master yarn --conf spark.ui.port=32421 --packages com.databricks:spark-avro_2.11:4.0.0

sqlContext.setConf(“spark.sql.shuffel.partitions”,“2”)

import com.databricks.spark.avro._

Reading Orders data and creating orders Dataframe

val ordersRDD =sc.textFile("/user/satishp38/Data/retail_db/orders/")
val ordersDF=ordersRDD.map(x => (x.split(",")(0).toInt,x.split(",")(1),x.split(",")(2).toInt,x.split(",")(3))).toDF(“Order_id”,“Order_date”,“Order_Customer_Id”,“Order_Status”)
ordersDF.registerTempTable(“OrdersDF”)

Reading Products data and creating Products Dataframe

val productsRaw = scala.io.Source.fromFile("/home/satishp38/public/retail_db/products/part-00000").getLines.toList
val productsRDD = sc.parallelize(productsRaw)
val productsDF = productsRDD.map(x => (x.split(",")(0).toInt,x.split(",")(2))).toDF(“Product_id”,“Product_Name”)
productsDF.registerTempTable(“products”)

Reading OrdersItems data and creating ordersItems Dataframe

val oiRaw = scala.io.Source.fromFile("/home/satishp38/public/retail_db/order_items/part-00000").getLines.toList
val oiRDD = sc.parallelize(oiRaw)
val oiDF = oiRDD.map(x => (x.split(",")(0).toInt,x.split(",")(1).toInt,x.split(",")(2).toInt,x.split(",")(4).toFloat)).toDF(“Order_item_id”,“Order_item_order_id”,“Order_item_Product_id”,“Order_item_SubTotal”)
oiDF.registerTempTable(“order_items”)

Joining orders,orderitems and products tables to get DailyRevenuePerProduct

val daily_revenue_per_product = sqlContext.sql("select o.order_date,p.product_name,round(sum(oi.Order_item_SubTotal)) as daily_revenue_Per_Products " +
"from OrdersDF o join order_items oi on o.order_id = oi.Order_item_order_id " +
"join products p on oi.Order_item_Product_id = p.product_id " +
"where o.order_status in (‘CLOSED’,‘COMPLETE’) " +
"group by o.order_date,p.product_name " +
“order by o.order_date ,daily_revenue_Per_Products desc”)

Saving the file in avro format (did not work)

daily_revenue_per_product.save("/user/satishp38/spark/daily_revenue_save_avro", “avro”)

Error: java.lang.ClassNotFoundException: Failed to find data source: avro. Please use Spark package http://spark-packages.org/package/databricks/spark-avro

daily_revenue_per_product.save("/user/satishp38/spark/daily_revenue_save_avro", “com.databricks.spark.avro”)

Error: java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/FileFormat

I was able to save the data in json and parquet

daily_revenue_per_product.save("/user/satishp38/spark/daily_revenue_save_json", “json”)
daily_revenue_per_product.write.json("/user/satishp38/spark/daily_revenue_write_json")
daily_revenue_per_product.save("/user/satishp38/spark/daily_revenue_save_parquet", “parquet”)
daily_revenue_per_product.write.parquet("/user/satishp38/spark/daily_revenue_write_parquet")
daily_revenue_per_product.write.avro("/user/satishp38/spark/daily_revenue_write_avro")