Let us go through the details related to advanced aggregations using rollup
in Spark. Let us start spark context for this Notebook so that we can execute the code provided.
rollup
The rollup
function in Spark is used for hierarchical aggregation. It generates all possible grand totals given a set of grouped-by columns. It helps in creating various levels of aggregation using a single function call.
count
The count
function is an aggregation function in Spark that is used to count the number of records in each group.
orderBy
The orderBy
function is used to sort the resulting DataFrame based on the specified columns.
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port', '0'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
appName(f'{username} | Python - Basic Transformations'). \
master('yarn'). \
getOrCreate()
If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.
Using Spark SQL
spark2-sql \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Scala
spark2-shell \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Pyspark
pyspark2 \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
orders = spark.read.json('/public/retail_db_json/orders')
orders.show()
orders.printSchema()
orders.count()
- Get count of orders rolled up by date.
from pyspark.sql.functions import count, lit
orders. \
groupBy('order_date'). \
agg(count(lit(1)).alias('order_count')). \
show()
- Get count of orders rolled up by month as well as date. You will see an additional record per month.
from pyspark.sql.functions import date_format
orders. \
groupBy(date_format('order_date', 'yyyyMM').alias('order_month'), 'order_date'). \
agg(count(lit(1)).alias('order_count')). \
show()
- Get count of orders rolled up by year, month as well as date. You will see an additional record per month as well as per year.
from pyspark.sql.functions import year
orders. \
groupBy(
year('order_date').alias('order_year'),
date_format('order_date', 'yyyyMM').alias('order_month'),
'order_date'
). \
agg(count(lit(1)).alias('order_count')). \
show()
- Filter by a specific month to get the aggregated data.
orders. \
rollup(
year('order_date').alias('order_year'),
date_format('order_date', 'yyyyMM').alias('order_month'),
'order_date'
). \
agg(count(lit(1)).alias('order_count')). \
filter("order_month = 201401"). \
show()
Hands-On Tasks
- Get count of orders rolled up by date.
- Get count of orders rolled up by month as well as date.
- Get count of orders rolled up by year, month as well as date.
- Filter by a specific month to get the aggregated data.
Conclusion
In this article, we explored how to aggregate data using rollup
in Spark. The rollup
function allows us to generate hierarchically aggregated results, making it a powerful tool for advanced analytics. We encourage you to practice these concepts and engage with the community for further learning.