Processing JSON Data using Spark 2 (Python and Scala)

As part of this topic we will see how to process heavy weight JSON data using Spark 2. We will see examples using both Scala as well as Python as programming language.

  • Setup Data Sets
  • Data Processing - Overview
  • Copy the Data to HDFS
  • Reading Data
  • Understanding the Data
  • Answering Questions (Processing Data)
  • Performance Considerations

Here is the video for August 7th Session.

Click here for the older session on the same topic.

Demo will be given in our state of the art Big Data labs. You can access the data directly with out wasting time in downloading and copying the data to HDFS.

Setup Data Sets

GitHub Activity Data is available publicly and we should be able to download for each hour using wget.

  • Here is the simple wget command which will download the data for the whole month.
  • Data will be downloaded for each hour in the month.
  • There will be one gz file for each hour with date and hour in the file name.
  • We can uncompress one of the file using gunzip and preview the data. Uncompressing is not required to process the data using Spark 2.
  • I have downloaded data for multiple months by replacing month from the wget command.
  • Once downloaded, we can use du -sh command to get the size of the files.
  • If you are using our labs, you can skip this step and directly use the data under /public/githubactivity
mkdir githubactivity
cd githubactivity
wget https://data.gharchive.org/2019-03-{01..31}-{0..23}.json.gz
du -sh .
  • We will copy this data to HDFS.

Data Processing - Overview

Let us talk about the steps involved in processing the data using Spark 2.

  • We need to make sure that data is saved into a file system from which the Spark Executors running in multiple nodes of a cluster can access the data.
  • We can use HDFS, Amazon s3 or any other supported distributed or cloud based file system.
  • In our case we will copy the data into HDFS and then process the data.
  • Once the data is copied we can use Spark APIs to develop the logic to process the data.
    1. Create Spark Context or Spark Session (Spark 2)
    2. Read the data from file system into Data Frame
    3. Process the data using Data Frame APIs
    4. Save the data into a file system or Database
  • We will demonstrate first 3 steps and then preview the data using Data Frame APIs.
  • Here the focus will be on understanding how to process the JSON data.

Copy the Data to HDFS

As the data is downloaded let us copy the data into HDFS.

  • We can use hadoop fs -put command to copy the data from local file system into HDFS.
hadoop fs -mkdir /user/`whoami`/githubactivity
hadoop fs -put -f ~/githubactivity/* /user/`whoami`/githubactivity/.
hadoop fs -ls /user/`whoami`/githubactivity/
hadoop fs -du -s -h /user/`whoami`/githubactivity/

Reading Data

Now let us understand how to read the data. Following are the typical steps.

  • Create Spark Context or Spark Session (Spark 2).
  • Spark Session exposes APIs to read data from different file formats.
  • In this case we need to use spark.read.json
  • Instead of reading all the files we will only read files related to 2019 March.

Scala Version

val data = spark.
  read.
  json("/public/githubactivity/2019-03-*.json.gz")

data.printSchema

Python Version

data = spark. \
  read. \
  json("/public/githubactivity/2019-03-*.json.gz")

data.printSchema()

Understanding the Data

Let us understand a bit on the structure of GitHub Activity Data.

  • We can use printSchema to print the schema.
  • Activity contain following information in the payload in the form nested jsons.
    • Comments
    • Commits
    • Description
    • Forks
    • Issues
    • and many more
  • Let us extract the comments information from the repositories. We will check if payload.comment.id is null or not to get only those activity where comments are involved.

Scala Version

data.
  filter("payload.comment.id IS NOT NULL").
  select("payload.comment.*", "repo.*").
  printSchema

Python Version

data. \
  filter("payload.comment.id IS NOT NULL"). \
  select("payload.comment.*", "repo.*"). \
  printSchema()

Answering Questions (Processing Data)

Let us see how we can process the data. We can have as many problem statements as we want.

  • Problem Statements - Examples
    • Get the number of commits per repository in the whole month
    • Get the number of issues per repository in the whole month
    • Get the number of forks per repository in the whole month
    • You can spend time on payload and get as many problems as you can
  • Solutions - let us pick up second problem statement and come up with the solution.
    • Identify the fields using which we can get the number of issues.
    • Fetch those fields and then come up with the logic to get number of issues per repository for a given dimension (date, week, month, year etc).
    • We can identify the lowest granularity and aggregate data to that level. In our case it can be daily.
    • Once the data is processed, we typically push the data to the reporting Database from which reports can be created.

Scala Solution

data.filter("payload.issue.id IS NOT NULL").
  select($"repo.id", $"repo.name", to_date($"payload.issue.created_at").alias("created_at")).
  groupBy("id", "name", "created_at").
  agg(count(lit(1)).alias("issue_count")).
  show

Python Solution

from pyspark.sql.functions import *
data.filter("payload.issue.id IS NOT NULL"). \
  select("repo.id", "repo.name", to_date("payload.issue.created_at").alias("created_at")). \
  groupBy("id", "name", "created_at"). \
  agg(count(lit(1)).alias("issue_count")). \
  show()

Performance Considerations

Let us understand some of the performance considerations we have already used. Also we will see what happens if we leave to defaults.

  • Review the pyspark2 or spark2-shell command
    • spark.dynamicAllocation.enabled is set to false
    • num-executors is set to 16
    • executor-memory is set to 2G
    • and more
  • You need to sign up to the performance tuning workshop to understand more details about these settings.
  • If we now launch pyspark2 or spark2-shell with defaults except for dynamic allocation, then it use only 2 executors and data processing will take considerably higher amount of time.

Learn Spark 1.6.x or Spark 2.x on our state of the art big data labs

  • Click here for access to state of the art 13 node Hadoop and Spark Cluster