Unable to Save data in Avro format using spark scala

spark-shell --packages com.databricks:spark-avro_2.10:2.0.1
import com.databricks.spark.avro._

val data = Seq((“James “,””,“Smith”,2018,1,“M”,3000),
(“Michael “,“Rose”,””,2010,3,“M”,4000),
(“Robert “,””,“Williams”,2010,3,“M”,4000),
(“Maria “,“Anne”,“Jones”,2005,5,“F”,4000),
(“Jen”,“Mary”,“Brown”,2010,7,””,-1)
)

val columns = Seq(“firstname”, “middlename”, “lastname”, “dob_year”, “dob_month”, “gender”, “salary”)

val sqlContext= new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

val df = data.toDF(columns:_*)
df.write.format(“avro”).save("/user/modinesh/spark/avro/people.avro")

GIVES BELOW ERROR.

scala> df.write.format(“avro”).save("/user/modinesh/spark/avro/people.avro")
java.lang.ClassNotFoundException: Failed to find data source: avro. Please use Spark package http://spark-packages.org/package/databricks/spark-avro
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:72)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:219)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:39)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:44)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:46)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:48)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:50)
at $iwC$$iwC$$iwC$$iwC$$iwC.(:52)
at $iwC$$iwC$$iwC$$iwC.(:54)
at $iwC$$iwC$$iwC.(:56)
at $iwC$$iwC.(:58)
at $iwC.(:60)
at (:62)
at .(:66)
at .()
at .(:7)
at .()
at $print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:750)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: avro.DefaultSource
at scala.tools.nsc.interpreter.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:83)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4$$anonfun$apply$1.apply(ResolvedDataSource.scala:62)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4$$anonfun$apply$1.apply(ResolvedDataSource.scala:62)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4.apply(ResolvedDataSource.scala:62)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4.apply(ResolvedDataSource.scala:62)
at scala.util.Try.orElse(Try.scala:82)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:62)
… 51 more

Please help me out.


Learn Spark 1.6.x or Spark 2.x on our state of the art big data labs

  • Click here for access to state of the art 13 node Hadoop and Spark Cluster

Please find the answers:

Even though you are import avro binary files in top , spark doesn’t know the avro class.
Please use the fully formatted command.

scala> val data = Seq((“James “,””,“Smith”,2018,1,“M”,3000),
| (“Michael “,“Rose”,””,2010,3,“M”,4000),
| (“Robert “,””,“Williams”,2010,3,“M”,4000),
| (“Maria “,“Anne”,“Jones”,2005,5,“F”,4000),
| (“Jen”,“Mary”,“Brown”,2010,7,””,-1)
| )
data: Seq[(String, String, String, Int, Int, String, Int)] = List((“James “,””,Smith,2018,1,M,3000), (“Michael “,Rose,””,2010,3,M,4000), (“Robert “,””,Williams,2010,3,M,4000), (“Maria “,Anne,Jones,2005,5,F,4000), (Jen,Mary,Brown,2010,7,””,-1))

scala> val columns = Seq(“firstname”, “middlename”, “lastname”, “dob_year”, “dob_month”, “gender”, “salary”)
columns: Seq[String] = List(firstname, middlename, lastname, dob_year, dob_month, gender, salary)

scala> import sqlContext.implicits._
import sqlContext.implicits._

scala>

scala> val df = data.toDF(columns:_*)
df: org.apache.spark.sql.DataFrame = [firstname: string, middlename: string, lastname: string, dob_year: int, dob_month: int, gender: string, salary: int]

scala> df.count
res0: Long = 5

scala> df.write.format(“com.databricks.spark.avro”).save("/user/anuvenkatesheee/people")

scala> df.show(false)
±--------±---------±-------±-------±--------±-----±-----+
|firstname|middlename|lastname|dob_year|dob_month|gender|salary|
±--------±---------±-------±-------±--------±-----±-----+
|James | |Smith |2018 |1 |M |3000 |
|Michael |Rose | |2010 |3 |M |4000 |
|Robert | |Williams|2010 |3 |M |4000 |
|Maria |Anne |Jones |2005 |5 |F |4000 |
|Jen |Mary |Brown |2010 |7 | |-1 |
±--------±---------±-------±-------±--------±-----±-----+

scala> val sqlContext= new org.apache.spark.sql.SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@3f806c54

scala> val df1 = sqlContext.read.format(“com.databricks.spark.avro”).load("/user/anuvenkatesheee/people")
df1: org.apache.spark.sql.DataFrame = [firstname: string, middlename: string, lastname: string, dob_year: int, dob_month: int, gender: string, salary: int]

scala> df1.show(false)
±--------±---------±-------±-------±--------±-----±-----+
|firstname|middlename|lastname|dob_year|dob_month|gender|salary|
±--------±---------±-------±-------±--------±-----±-----+
|James | |Smith |2018 |1 |M |3000 |
|Michael |Rose | |2010 |3 |M |4000 |
|Robert | |Williams|2010 |3 |M |4000 |
|Maria |Anne |Jones |2005 |5 |F |4000 |
|Jen |Mary |Brown |2010 |7 | |-1 |
±--------±---------±-------±-------±--------±-----±-----+

1 Like

It works like charm
Thanks venkateshm for your help.

1 Like