Apache Spark 2.x - Processing Data using Data Frames - Basic Transformations - Data Frame Operations - Performing Aggregations using sum, avg etc

Performing aggregations

Let us see how to perform aggregations with in each group.

  • We have functions such as sum, avg, min, max etc which can be used to aggregate the data.
  • We need to create WindowSpec object using partitionBy to get aggregations with in each group.
  • Some realistic use cases
    • Get average salary for each department and get all employee details who earn more than average salary
    • Get average revenue for each day and get all the orders who earn revenue more than average revenue
    • Get highest order revenue and get all the orders which have revenue more than 75% of the revenue
from pyspark.sql import SparkSession

spark = SparkSession.\
builder.\
appName('Explore WindowAggregae Functions').\
master('local').\
getOrCreate()

employeePath = "/public/hr_db/employees/part-m-00001"

employees = spark.read.format('csv').option('sep',"\t").schema('''
employee_id INT,
first_name STRING,
last_name STRING,
email STRING,
phone_number STRING,
hire_date STRING,
job_id STRING,
salary FLOAT,
comission_pct STRING,
manager_id STRING,
department_id STRING
''').\
load(employeePath)


type(employees)

employees.printSchema()

employees.select('employee_id','department_id','salary').show()

from pyspark.sql.window import *

from pyspark.sql.functions import min,max, avg,sum

spec = Window.partitionBy('department_id')

employees.select('employee_id','department_id','salary').\
withColumn('salary_expense',sum('salary').over(spec)).\
show()

employees.select('employee_id','department_id','salary').\
withColumn('salary_expense',sum('salary').over(spec)).\
sort('department_id').show()

employees.select('employee_id','department_id','salary').\
withColumn('salary_expense',sum('salary').over(spec)).\
withColumn('least_salary',min('salary').over(spec)).\
sort('department_id').show()

employees.select('employee_id','department_id','salary').\
withColumn('salary_expense',sum('salary').over(spec)).\
withColumn('highest_salary',max('salary').over(spec)).\
sort('department_id').show()

employees.select('employee_id','department_id','salary').\
withColumn('salary_expense',sum('salary').over(spec)).\
withColumn('average_salary',avg('salary').over(spec)).\
sort('department_id').show()

employees.select('employee_id','department_id','salary').\
withColumn('salary_expense',sum('salary').over(spec)).\
withColumn('least_salary',min('salary').over(spec)).\
withColumn('highest_salary',max('salary').over(spec)).\
withColumn('average_salary',avg('salary').over(spec)).\
sort('department_id').show()

from pyspark.sql.functions import col
from pyspark.sql.functions import round

employees.select('employee_id','department_id','salary').\
withColumn('salary_expense',sum('salary').over(spec)).\
withColumn('salary_pct', round((employees.salary/col('salary_expense')) * 100 , 2)).\
sort('department_id').show()


Learn Spark 1.6.x or Spark 2.x on our state of the art big data labs

  • Click here for access to state of the art 13 node Hadoop and Spark Cluster