Data frames -- writing it in text format

Hi,

When I try to write the dataframes which has two colums it throws the below error. So can’t we write more than one column as textfile?

Commands:
scala> val countbyorderstatus=sqlContext.sql(“select order_status,count(1) count_status from orders_temp group by order_status”)

scala> countbyorderstatus.write.format(“text”).mode(“overwrite”).save("/user/vimaldoss18/countbyorderstatus_text")

org.apache.spark.sql.AnalysisException: Text data source supports only a single column, and you have 2 columns.;
at org.apache.spark.sql.execution.datasources.text.DefaultSource.org$apache$spark$sql$execution$datasources$text$DefaultSource$$verifySchema(DefaultSource.scala:60)
at org.apache.spark.sql.execution.datasources.text.DefaultSource$$anonfun$createRelation$1.apply(DefaultSource.scala:52)
at org.apache.spark.sql.execution.datasources.text.DefaultSource$$anonfun$createRelation$1.apply(DefaultSource.scala:52)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.sql.execution.datasources.text.DefaultSource.createRelation(DefaultSource.scala:52)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:242)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:36)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:41)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:43)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:45)

There is no direct method to save dataframe as text file.
Import spark-csv library provided by Databricks and save as csv file.

spark-shell --packages com.databricks:spark-csv_2.10:1.4.0

2 Likes

You can convert the dataframe to RDD using map and then use saveAsTextFile

1 Like

I tried converting the dataframe to RDD and saved as textfile, I am able to do it but it creates 200 tasks eventhough I set sqlContext.setConf(“spark.sql.shuffle.partitions”,“2”) any idea??

This is how I am converting and storing as textfile.

countByOrderStatus.map(rec=>rec(0)+"\t"+rec(1)).saveAsTextFile("/user/vimaldoss18/countByStatus_text")

Yes… This option also worked for with creating 200 tasks even though I specify the sqlContext.setConf(“spark.sql.shuffle.partitions”,“2”)…

@vimal.doss18 You can use repartition method on DataFrame to control the number of partitions before converting it into RDD. i.e when you call df.repartition(1).rdd.saveAsTextFile("/path-to-output-file") then you will see only 1 file in output folder.

In real word scenario I would recommend to save in parquet file format as they are much much performant than any other type.

2 Likes

@vimal.doss18 @venkatreddy-amalla

Dont use repartition() rather use coalesce()

df.coalesce(1).rdd.saveAsTextFile("/path-to-output-file")

Because repartition() will shuffle the data .

coalesce uses existing partitions to minimize the amount of data that’s shuffled. repartition creates new partitions and does a full shuffle. coalesce results in partitions with different amounts of data (sometimes partitions that have much different sizes) and repartition results in roughly equal sized partitions.

2 Likes

Thank you @RavanthReddy and @venkatreddy-amalla!!! It helped me a lot…

spark-shell --packages com.databricks:spark-csv_2.10:1.5.0

import com.databricks.spark.csv._

[ Now you can convert the files into CSV]

@vimal.doss18:

I just tested in lab my code w/o this import. It worked.

import com.databricks.spark.csv._

If you are not giving anything, it is text file (CSV, TSV) by default.

You can convert:
DF.rdd.saveAsTextFile("")

Yes, if you really want to save data with tab delimited or any other field delimited than default, you have to use DF.rdd.map (function).saveAsTextFile("").

Please let me know if not clear, so I will post my code for your ref.
Thanks
Venkat

Guys,

I don’t see any difference in the output file content, format etc, when I use df.rdd.map and when I didn’t use .rdd. Both the lines of code below gave same output.

df.rdd.map(lambda a: str(a[0]) + “, " + str(a[1])).coalesce(1).saveAsTextFile(”/user/cloudera/ashish/ans2/")

df.map(lambda a: str(a[0]) + “, " + str(a[1])).coalesce(1).saveAsTextFile(”/user/cloudera/ashish/ans2/")

Can anyone help me understand the diff. I ;m using Cloudera Quickstart VM 5.12 with Spark 1.6.2

used RDD map function to convert as one text field using delimiter and saved as text file

Hi ,

I am stuck in writing data of a dataframe into textfile in hdfs.
I am trying to use this code
val result21=result2.coalesce(1).rdd
result21.map(x=>x.getString(0)).saveAsTextFile("/user/spark_practice/problem2/data/unique_stock")

But the code is writing only 1st column in the location