In this article, we will explore how to write DataFrames to various file formats in Spark. We will cover the APIs available for different formats, how to specify options, and how to handle existing directories.
Overview of Batch Write APIs
All batch write APIs are grouped under write
, which is exposed to DataFrame objects. These APIs allow you to write DataFrames to different file formats, including:
- text: Write single column data to text files.
- csv: Write data to text files with delimiters (default is a comma).
- json: Write data to JSON files.
- orc: Write data to ORC files.
- parquet: Write data to Parquet files.
You can also write data to other file formats using write.format
, for example, avro.
Writing DataFrames with Options
When writing DataFrames to files, you can specify various options:
- compression: Specify the compression codec (e.g.,
gzip
,snappy
). - sep: Specify delimiters when writing to text files using csv.
You can choose to overwrite
existing directories or append
to them using the mode
option.
Example: Writing Data to Parquet Format
Let’s create a copy of orders data in Parquet file format with no compression. If the target folder already exists, we will overwrite it.
First, start the Spark context:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port', '0'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
appName(f'{username} | Python - Data Processing - Overview'). \
master('yarn'). \
getOrCreate()
If you prefer using CLIs, you can start Spark SQL using one of the following approaches:
Using Spark SQL
spark2-sql \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Scala
spark2-shell \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using PySpark
pyspark2 \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Create and Write Orders Data
Create a DataFrame for orders data:
orders = spark. \
read. \
csv('/public/retail_db/orders',
schema='''
order_id INT,
order_date STRING,
order_customer_id INT,
order_status STRING
'''
)
orders.printSchema()
orders.show()
Write the DataFrame to Parquet format with no compression:
orders. \
write. \
parquet(f'/user/{username}/retail_db/orders',
mode='overwrite',
compression='none'
)
You can also achieve the same result using the option
and format
methods:
orders. \
write. \
mode('overwrite'). \
option('compression', 'none'). \
parquet(f'/user/{username}/retail_db/orders')
orders. \
write. \
mode('overwrite'). \
option('compression', 'none'). \
format('parquet'). \
save(f'/user/{username}/retail_db/orders')
Handling Multiple Files
By default, the number of files in the output directory is equal to the number of tasks used to process the data in the last stage. To avoid generating too many small files, you can control the number of output files using coalesce
:
orders. \
coalesce(1). \
write. \
mode('overwrite'). \
option('compression', 'none'). \
parquet(f'/user/{username}/retail_db/orders')
Conclusion
Writing DataFrames to different file formats in Spark is straightforward with the batch write APIs. By specifying appropriate options and using methods like coalesce
, you can efficiently manage file formats and control the output.