Apache Spark Python - Data Processing Overview - Overview of Spark Write APIs

In this article, we will explore how to write DataFrames to various file formats in Spark. We will cover the APIs available for different formats, how to specify options, and how to handle existing directories.

Overview of Batch Write APIs

All batch write APIs are grouped under write, which is exposed to DataFrame objects. These APIs allow you to write DataFrames to different file formats, including:

  • text: Write single column data to text files.
  • csv: Write data to text files with delimiters (default is a comma).
  • json: Write data to JSON files.
  • orc: Write data to ORC files.
  • parquet: Write data to Parquet files.

You can also write data to other file formats using write.format, for example, avro.

Writing DataFrames with Options

When writing DataFrames to files, you can specify various options:

  • compression: Specify the compression codec (e.g., gzip, snappy).
  • sep: Specify delimiters when writing to text files using csv.

You can choose to overwrite existing directories or append to them using the mode option.

Example: Writing Data to Parquet Format

Let’s create a copy of orders data in Parquet file format with no compression. If the target folder already exists, we will overwrite it.

First, start the Spark context:

from pyspark.sql import SparkSession
import getpass

username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Data Processing - Overview'). \
    master('yarn'). \
    getOrCreate()

If you prefer using CLIs, you can start Spark SQL using one of the following approaches:

Using Spark SQL

spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Scala

spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using PySpark

pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Create and Write Orders Data

Create a DataFrame for orders data:

orders = spark. \
    read. \
    csv('/public/retail_db/orders',
        schema='''
            order_id INT, 
            order_date STRING, 
            order_customer_id INT, 
            order_status STRING
        '''
       )

orders.printSchema()
orders.show()

Write the DataFrame to Parquet format with no compression:

orders. \
    write. \
    parquet(f'/user/{username}/retail_db/orders', 
            mode='overwrite', 
            compression='none'
           )

You can also achieve the same result using the option and format methods:

orders. \
    write. \
    mode('overwrite'). \
    option('compression', 'none'). \
    parquet(f'/user/{username}/retail_db/orders')

orders. \
    write. \
    mode('overwrite'). \
    option('compression', 'none'). \
    format('parquet'). \
    save(f'/user/{username}/retail_db/orders')

Handling Multiple Files

By default, the number of files in the output directory is equal to the number of tasks used to process the data in the last stage. To avoid generating too many small files, you can control the number of output files using coalesce:

orders. \
    coalesce(1). \
    write. \
    mode('overwrite'). \
    option('compression', 'none'). \
    parquet(f'/user/{username}/retail_db/orders')

Watch the video tutorial here

Conclusion

Writing DataFrames to different file formats in Spark is straightforward with the batch write APIs. By specifying appropriate options and using methods like coalesce, you can efficiently manage file formats and control the output.