Apache Spark Python - Basic Transformations - Aggregate data using rollup

Let us go through the details related to advanced aggregations using rollup in Spark. Let us start spark context for this Notebook so that we can execute the code provided.

rollup

The rollup function in Spark is used for hierarchical aggregation. It generates all possible grand totals given a set of grouped-by columns. It helps in creating various levels of aggregation using a single function call.

count

The count function is an aggregation function in Spark that is used to count the number of records in each group.

orderBy

The orderBy function is used to sort the resulting DataFrame based on the specified columns.

from pyspark.sql import SparkSession

import getpass

username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Basic Transformations'). \
    master('yarn'). \
    getOrCreate()

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

Using Spark SQL

spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Scala

spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Pyspark

pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
orders = spark.read.json('/public/retail_db_json/orders')
orders.show()
orders.printSchema()
orders.count()
  • Get count of orders rolled up by date.
from pyspark.sql.functions import count, lit

orders. \
    groupBy('order_date'). \
    agg(count(lit(1)).alias('order_count')). \
    show()
  • Get count of orders rolled up by month as well as date. You will see an additional record per month.
from pyspark.sql.functions import date_format

orders. \
    groupBy(date_format('order_date', 'yyyyMM').alias('order_month'), 'order_date'). \
    agg(count(lit(1)).alias('order_count')). \
    show()
  • Get count of orders rolled up by year, month as well as date. You will see an additional record per month as well as per year.
from pyspark.sql.functions import year

orders. \
    groupBy(
        year('order_date').alias('order_year'),
        date_format('order_date', 'yyyyMM').alias('order_month'), 
        'order_date'
    ). \
    agg(count(lit(1)).alias('order_count')). \
    show()
  • Filter by a specific month to get the aggregated data.
orders. \
    rollup(
        year('order_date').alias('order_year'),
        date_format('order_date', 'yyyyMM').alias('order_month'), 
        'order_date'
    ). \
    agg(count(lit(1)).alias('order_count')). \
    filter("order_month = 201401"). \
    show()

Watch the video tutorial here

Hands-On Tasks

  1. Get count of orders rolled up by date.
  2. Get count of orders rolled up by month as well as date.
  3. Get count of orders rolled up by year, month as well as date.
  4. Filter by a specific month to get the aggregated data.

Conclusion

In this article, we explored how to aggregate data using rollup in Spark. The rollup function allows us to generate hierarchically aggregated results, making it a powerful tool for advanced analytics. We encourage you to practice these concepts and engage with the community for further learning.