Avro Snappy Compression

apache-spark
compression
avro
certification

#1

I am trying to save the data result from spark-sql as Avro with Snappy Compression.But its not saving the file with Snappy Compression,but saved it in uncompressed format. By verifying the file size it showing both compressed and uncompressed file is same.Please go through below code.

sqlContext.setConf(“spark.sql.avro.compression.codec”,“snappy”)
import com.databricks.spark.avro._;
df.write.avro(path)

Environment:Cloudera VM

Is there any issue with avro-snappy compression?
is this right way of compression?

Please update.


#2

Try this
sqlContext.sql(“set spark.sql.avro.compression.codec=snappy”)


#3

@mxneelam Thanks for your reply.:slight_smile:
I have tried this code.But getting the same result as above mentioned.


#4

Hello Yeswanth,

Please see this link http://www.itversity.com/topic/different-file-formats-python/
sqlContext.setConf(“spark.sql.avro.compression.codec”,”snappy”)
Also did you start the spark-shell with --packages com.databricks:spark-avro_2.10:2.0.1 ?

It should work. Try starting spark-shell with spark-shell with --packages com.databricks:spark-avro_2.10:2.0.1
import com.databricks.spark.avro._;
then set sqlContext.setConf(“spark.sql.avro.compression.codec”,“snappy”)
after that df.write.avro(path)

Hope you will try and let know the outcome.:slight_smile:

Thanks,
Dinakar.


#5

@Yeswanth:
It should work per @Dinakar’s instructions.
Please post solution in community.
Thank you
Venkat


#6

Hi @Dinakar,@avr8082,

Thank You so much for your replies,

First Please confirm your code is worked on Cloudera VM or any other cluster?.Because i have worked on cloudera VM

I have tried with Dinakar’s instruction’s but still i didn’t get any changes in the file size and compression.First I have generated normal avro file and Snappy compressed Avro file then launched spark shell with avro packages and generated avro file with snappy compression.In these three steps its size showing 1.9MB.
The other thing its not showing snappy extension,and shows only “.avro”!
Steps Given Below

Environment:Cloudera VM

1.Generated one textfile from retail_db .order_items table having size 5.2MB
2.Converted to avro without compression using SparkSql having size 1.9MB
3.Converted to avro with compression SNAPPY using sparksql having size 1.9MB
code:

    import com.databricks.spark.avro._;
    sqlContext.setConf("spark.sql.avro.compression.codec","snappy");
    resultdf.write.avro("/user/cloudera/resultavrosnappy");

4.Followed Dinakar’s Instruction
Launched spark-shell with spark-shell --packages com.databricks:spark-avro_2.10:2.0.1;
code:

                    import com.databricks.spark.avro._;
                    sqlContext.setConf("spark.sql.avro.compression.codec","snappy");
                    resultdf.write.avro("/user/cloudera/resultavrosnappy2");

But got the result file having Size 1.9MB.


#7

@Yeswanth,

It worked for me. Below is my code.

[cloudera@quickstart ~]$ spark-shell --master yarn --packages com.databricks:spark-avro_2.10:2.0.1
Ivy Default Cache set to: /home/cloudera/.ivy2/cache
The jars for the packages stored in: /home/cloudera/.ivy2/jars
:: loading settings :: url = jar:file:/usr/lib/spark/lib/spark-assembly-1.6.0-cdh5.12.0-hadoop2.6.0-cdh5.12.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
com.databricks#spark-avro_2.10 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
	confs: [default]
	found com.databricks#spark-avro_2.10;2.0.1 in central
	found org.apache.avro#avro;1.7.6 in central
	found org.codehaus.jackson#jackson-core-asl;1.9.13 in central
	found org.codehaus.jackson#jackson-mapper-asl;1.9.13 in central
	found com.thoughtworks.paranamer#paranamer;2.3 in central
	found org.xerial.snappy#snappy-java;1.0.5 in central
	found org.apache.commons#commons-compress;1.4.1 in central
	found org.tukaani#xz;1.0 in central
	found org.slf4j#slf4j-api;1.6.4 in central
:: resolution report :: resolve 963ms :: artifacts dl 40ms
	:: modules in use:
	com.databricks#spark-avro_2.10;2.0.1 from central in [default]
	com.thoughtworks.paranamer#paranamer;2.3 from central in [default]
	org.apache.avro#avro;1.7.6 from central in [default]
	org.apache.commons#commons-compress;1.4.1 from central in [default]
	org.codehaus.jackson#jackson-core-asl;1.9.13 from central in [default]
	org.codehaus.jackson#jackson-mapper-asl;1.9.13 from central in [default]
	org.slf4j#slf4j-api;1.6.4 from central in [default]
	org.tukaani#xz;1.0 from central in [default]
	org.xerial.snappy#snappy-java;1.0.5 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   9   |   0   |   0   |   0   ||   9   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
	confs: [default]
	0 artifacts copied, 9 already retrieved (0kB/18ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/flume-ng/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/parquet/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/avro/avro-tools-1.7.6-cdh5.12.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.0
      /_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_67)
Type in expressions to have them evaluated.
Type :help for more information.
17/12/16 14:36:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/12/16 14:36:59 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
Spark context available as sc (master = yarn-client, app id = application_1513415057165_0001).
SQL context available as sqlContext.

scala> val orders = sc.textFile("/user/cloudera/retail_db/orders")
orders: org.apache.spark.rdd.RDD[String] = /user/cloudera/retail_db/orders MapPartitionsRDD[1] at textFile at <console>:27

scala> orders.take(4).foreach(println)
1,2013-07-25 00:00:00.0,11599,CLOSED
2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
3,2013-07-25 00:00:00.0,12111,COMPLETE
4,2013-07-25 00:00:00.0,8827,CLOSED

scala> val ordersDF = orders.map(x => {
     | val recs = x.split(",")
     | (recs(0).toInt,recs(1),recs(2).toInt,recs(3))}).toDF("order_id","order_date","cust_id","status")
ordersDF: org.apache.spark.sql.DataFrame = [order_id: int, order_date: string, cust_id: int, status: string]

scala> import com.databricks.spark.avro._
import com.databricks.spark.avro._

scala> sqlContext.setConf("spark.sql.avro.compression.codec","snappy")
17/12/16 14:43:51 WARN metastore.ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.1.0-cdh5.12.0
17/12/16 14:43:52 WARN metastore.ObjectStore: Failed to get database default, returning NoSuchObjectException

scala> ordersDF.write.avro("/user/cloudera/resultSnappy")
                                                                                
scala> 


[cloudera@quickstart ~]$ hdfs dfs -ls -h /user/cloudera/resultSnappy
Found 3 items
-rw-r--r--   1 cloudera supergroup          0 2017-12-16 14:44 /user/cloudera/resultSnappy/_SUCCESS
-rw-r--r--   1 cloudera supergroup    359.7 K 2017-12-16 14:44 /user/cloudera/resultSnappy/part-r-00000-ba66055f-f4da-49f6-bb40-f9b5c9ab3a97.avro
-rw-r--r--   1 cloudera supergroup    360.7 K 2017-12-16 14:44 /user/cloudera/resultSnappy/part-r-00001-ba66055f-f4da-49f6-bb40-f9b5c9ab3a97.avro
[cloudera@quickstart ~]$

#8

Hi @Dinakar
Sorry for delayed response.
Please find my code the same result is getting now also.And let me know can we launch the spark shell with data bricks packages during certification.

Code:

[cloudera@quickstart ~]$ spark-shell --master yarn --packages com.databricks:spark-avro_2.10:2.0.1
Ivy Default Cache set to: /home/cloudera/.ivy2/cache
The jars for the packages stored in: /home/cloudera/.ivy2/jars
:: loading settings :: url = jar:file:/usr/lib/spark/lib/spark-assembly-1.6.0-cdh5.10.0-hadoop2.6.0-cdh5.10.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
com.databricks#spark-avro_2.10 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
confs: [default]
found com.databricks#spark-avro_2.10;2.0.1 in central
found org.apache.avro#avro;1.7.6 in central
found org.codehaus.jackson#jackson-core-asl;1.9.13 in central
found org.codehaus.jackson#jackson-mapper-asl;1.9.13 in central
found com.thoughtworks.paranamer#paranamer;2.3 in central
found org.xerial.snappy#snappy-java;1.0.5 in central
found org.apache.commons#commons-compress;1.4.1 in central
found org.tukaani#xz;1.0 in central
found org.slf4j#slf4j-api;1.6.4 in central
:: resolution report :: resolve 3969ms :: artifacts dl 192ms
:: modules in use:
com.databricks#spark-avro_2.10;2.0.1 from central in [default]
com.thoughtworks.paranamer#paranamer;2.3 from central in [default]
org.apache.avro#avro;1.7.6 from central in [default]
org.apache.commons#commons-compress;1.4.1 from central in [default]
org.codehaus.jackson#jackson-core-asl;1.9.13 from central in [default]
org.codehaus.jackson#jackson-mapper-asl;1.9.13 from central in [default]
org.slf4j#slf4j-api;1.6.4 from central in [default]
org.tukaani#xz;1.0 from central in [default]
org.xerial.snappy#snappy-java;1.0.5 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 9 | 0 | 0 | 0 || 9 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
confs: [default]
0 artifacts copied, 9 already retrieved (0kB/94ms)
Setting default log level to “WARN”.
To adjust logging level use sc.setLogLevel(newLevel).
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/flume-ng/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/parquet/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Welcome to
____ __
/ / ___ / /
\ / _ / _ `/ __/ '/
/
/ .__/_,// //_\ version 1.6.0
/
/

Using Scala version 2.10.5 (Java HotSpot™ 64-Bit Server VM, Java 1.7.0_67)
Type in expressions to have them evaluated.
Type :help for more information.
17/12/17 06:44:56 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:951)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:689)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:878)
Spark context available as sc (master = yarn-client, app id = application_1513518646527_0003).
17/12/17 06:47:06 WARN metastore.ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.1.0
17/12/17 06:47:08 WARN metastore.ObjectStore: Failed to get database default, returning NoSuchObjectException
SQL context available as sqlContext.

scala> val data=sc.textFile("/user/cloudera/order_items");
data: org.apache.spark.rdd.RDD[String] = /user/cloudera/order_items MapPartitionsRDD[1] at textFile at :27

scala> data.map(x=>(x.split(","))).map(x=>(x(0),x(1),x(2),x(3),x(4),x(5))).toDF(“order_item_id”,“order_item_order_id”,“order_item_product_id”,“order_item_quantity”,“order_item_subtotal”,“order_item_product_price”).registerTempTable(“order_items”);

scala> val result=sqlContext.sql(“select * from order_items”);
result: org.apache.spark.sql.DataFrame = [order_item_id: string, order_item_order_id: string, order_item_product_id: string, order_item_quantity: string, order_item_subtotal: string, order_item_product_price: string]

scala> import com.databricks.spark.avro.;
import com.databricks.spark.avro.

scala> result.repartition(1).write.avro("/user/cloudera/ResultAvro");

scala> sqlContext.setConf(“spark.sql.avro.compression.codec”,“snappy”);

scala> result.repartition(1).write.avro("/user/cloudera/ResultAvroSnappy");

[cloudera@quickstart ~]$ hadoop fs -ls -h /user/cloudera/ResultAvro
Found 2 items
-rw-r–r-- 1 cloudera cloudera 0 2017-12-17 06:51 /user/cloudera/ResultAvro/_SUCCESS
-rw-r–r-- 1 cloudera cloudera 1.9 M 2017-12-17 06:51 /user/cloudera/ResultAvro/part-r-00000-ff714e94-260d-4ec5-a4b9-516d89cd5cdd.avro
[cloudera@quickstart ~]$ hadoop fs -ls -h /user/cloudera/ResultAvroSnappy
Found 2 items
-rw-r–r-- 1 cloudera cloudera 0 2017-12-17 06:52 /user/cloudera/ResultAvroSnappy/_SUCCESS
-rw-r–r-- 1 cloudera cloudera 1.9 M 2017-12-17 06:52 /user/cloudera/ResultAvroSnappy/part-r-00000-5e846b0e-9e47-4d5f-b6c2-11f48cb9075d.avro
[cloudera@quickstart ~]$


#9

Hi @Yeswanth,

Not sure about launching the spark-shell with avro. I have not taken the exam yet.

Thanks,
Dinakar


#10

@Dinakar

Do you think the above issue related to Cloudera VM?