Building Streaming Pipelines - Databricks

As part of this topic we will see how to ingest data in real time using Kafka eco system and process using Spark Structured Streaming on top of Databricks.


If you want to learn more about Databricks Platform, you can sign up to our Udemy Course.

Feel free to subscribe to our YouTube channel for free live sessions.

You can also join our Slack Workspace to stay in touch with us.


Agenda

Here is the agenda for this module or section. We will see end to end streaming analytics pipeline using Kafka and Spark with Scala as Programming Language on top of Databricks Platform.

  • Define Problem Statement
  • Setup EC2 Instance
  • Simulate Web Server Logs
  • Setup and Start Kafka Broker
  • Create Kafka Topic and Validate
  • Ingest Web Server Logs into Kafka Topic
  • Setup Databricks Cluster
  • Integrate and Validate with Spark Structured Streaming
  • Streaming Data Processing using Data Frame APIs
  • Perform Data Analysis using Spark SQL

Define Problem Statement

As we are getting web server logs in streaming fashion, we want to analyze the traffic for each department.

  • gen_logs will publish the data to a log file simulating the user traffic to eCommerce platform.
  • Data is published using standard log format and it have several fields such as ip, timestamp, end point etc.
  • We just want to compute how many visits are happening for each department every minute.

Setup EC2 Instance

Let us go ahead and setup an EC2 Instance in AWS to set up Web log generator to generate log messages in streaming fashion.

If you have server with public ip, you can use it to setup the simulator. However you need to ensure that port number 9092 on which our Kafka Broker will be setup is open.

  • Login to AWS and go to EC2 Dashboard
  • Create new EC2 instance of size t2.small.
  • Make sure to use existing Key Pair or create new one.
  • Open port numbers 22 and 9092 as part of the security group that will be created while launching EC2 instance.
  • Once the EC2 instance is up and running, ensure that you can connect to the server using SSH.

Assign Elastic IP address

By default the Public IP address of EC2 instance is dynamic, if we stop the AWS EC2 instance and start it then the IP address as well as alias will change. Let us see how we can address this issue.

  • Make sure server is running with out any issues.
  • Go to EC2 Dashboard and then to Network and Security.
  • Click on Elastic IPs and create new one by clicking on Allocate New Address
  • Once address is allocated, we can select it and attach to our server where we want to set up the environment to run log message simulator and Kafka Broker.

Simulate Web Server Logs

Let us simulate web server logs using this Python 3 based simulator.

  • Make sure to have git installed - sudo yum -y install git
  • Clone the repository git clone https://github.com/dgadiraju/gen-logs-python3.git
  • Make sure Python 3 is installed on the server - yum -y install python3
  • Move the folder gen-logs-python3/gen-logs to /opt sudo mv -f gen-logs-python3/gen_logs /opt
  • Change the ownership of the folder to centos - sudo chown -R centos:centos /opt/gen_logs
  • Update PATH in appropriate .profile or by using export command - export PATH=$PATH:/opt/gen_logs
  • Run start_logs.sh to start generating logs inside /opt/gen_logs/logs
  • Validate by running tail_logs.sh, the console should be refreshed with new messages in real time.

Setup and Start Kafka Broker

Let us setup Kafka on the same server and start all the relevant components.

  • Make sure to install wget - sudo yum -y install wget
  • Make sure to install JDK 1.8 - sudo yum -y install java-1.8.0-openjdk
  • Download Kafka Software from here. We can use wget to download the file.
  • Untar and unzip the files - tar xzf kafka_2.12-2.3.1.tgz
  • Move to /opt/kafka and update the PATH in relevant profile.
sudo mv -f kafka_2.12-2.3.1 /opt/kafka
sudo chown -R centos:centos /opt/kafka
export PATH=$PATH:/opt/kafka/bin:/opt/kafka/sbin
  • Make sure to install telnet so that we can validate whether services are coming up or not. - sudo yum -y install telnet
  • Start and Validate Zookeeper Service
zookeeper-server-start.sh \
  -daemon /opt/kafka/config/zookeeper.properties
telnet localhost 2181
  • Update advertised.listeners in /opt/kafka/config/server.properties with the public DNS of AWS instance.
  • Start and Validate Kafka Broker
kafka-server-start.sh \
  -daemon /opt/kafka/config/server.properties
telnet localhost 9092

Create Kafka Topic and Validate

Let us create Kafka Topic and start redirecting tail_logs.sh output to Kafka Topic.

  • Create Kafka Topic
kafka-topics.sh --zookeeper localhost:2181 \
  --create \
  --topic retail \
  --partitions 2 \
  --replication-factor 1

kafka-topics.sh --zookeeper localhost:2181 \
  --list \
  --topic retail 

kafka-topics.sh --zookeeper localhost:2181 \
  --describe \
  --topic retail
  • Produce Messages to Kafka Topic
tail_logs.sh | kafka-console-producer.sh \
  --broker-list localhost:9092 \
  --topic retail
  • Validate by consuming messages from the Kafka Topic.
kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic retail

While we can redirect the output from tail_logs.sh to Kafka Topic, it is not reliable. When ever we stop and start, we will end up losing some data. It is better to use Kafka Connect to get the data from log file into Kafka Topic.

Ingest Web Server Logs into Kafka Topic

Let us set up Kafka Connect to get data from /opt/gen_logs/logs/access.log to Kafka Topic retail.

  • Kafka Connect can be used to get data from standard sources to Kafka Topic as well as from Kafka Topic to standard targets.
  • We can configure Kafka Connect using stand alone mode or distributed mode.
  • For this demo we will consider stand alone mode. Let me create a directory by name ksdemo - mkdir ksdemo
  • Let’s copy files for stand alone Kafka Connect from /opt/kafka/config to ksdemo.
cd ksdemo
cp /opt/kafka/config/connect-standalone.properties .
cp /opt/kafka/config/connect-file-source.properties .
  • We need to configure Kafka Broker, Offset Location using connect-standalone.properties as reference. Also we need to change key.converter and value.converter to get data in plain text. By default it uses JSON.
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
  • We need to specify source file location and topic name to which data need to be pushed using connect-file-source.properties as reference.
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/opt/gen_logs/logs/access.log
topic=retail
  • Once we are ready with both the properties files, we can start Kafka Connect process in stand alone mode using connect-standalone.sh
nohup \
  connect-standalone.sh \
  connect-standalone.properties \
  connect-file-source.properties \
  &
  • Make sure to validate by consuming messages from the Kafka Topic.
kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic retail

Make sure the server public DNS or ip is captured so that we can use it to consume the data using Spark Structured Streaming.

Setup Databricks Cluster

Let us setup Databricks Cluster and Validate to ensure that we are able to use Databricks Cluster for our purpose.

  • I would highly recommend to use full edition of Databricks rather than community edition.
  • Create interactive cluster with 2 to 3 m4.large or what ever instance type you have quotas on Azure.
  • You need to launch Notebook with programming language of your choice (in our case it is Scala).
  • Validate by running %fs ls /FileStore/tables to ensure that we can access DBFS from Notebook that comes as part of Databricks Cluster.
  • We can also run shell commands by using %sh and queries against Spark SQL tables using %sql.
  • We can use telnet command from Databricks Notebook to ensure that we are able to talk to Kafka Broker running on EC2 Instance.

Integrate and Validate with Spark Structured Streaming

We will use Spark Structured Streaming with Kafka as source to read data from Kafka Topic and then dump the messages into text file every 30 seconds.

Make sure to perform these validations before executing Spark Structured Streaming code to consume data from Kafka Topic.

  • Make sure Databricks Cluster can telnet using the public ip or DNS on which Kafka Brokers are running. Kafka broker is running using 9092 port number.
  • Make sure public DNS is configured using advertised.listeners in Kafka (if you are using servers which are bound to multiple IP Addresses).
  • When it comes to cloud providers, if we stop the server in between public DNS might have changed. You need to make sure that advertised listeners are configured with valid public DNS or ip.
  • Get the topic metadata and understand number of partitions using which topic is created.

Reading Data

Spark 2.x have an API called as readStream which can be used to read the data from Kafka Topic.

var streamingInputDF = 
  spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "ec2-3-234-136-14.compute-1.amazonaws.com:9092")
    .option("subscribe", "retail")     
    .option("startingOffsets", "latest")  
    .option("minPartitions", "2")  
    .option("failOnDataLoss", "true")
    .load()
    .select($"value".cast("string"))
    .as[(String)]

Printing on the Console

import org.apache.spark.sql.streaming.Trigger

val query =
  streamingInputDF
    .writeStream
    .format("console")
    .outputMode("append")
    .trigger(Trigger.ProcessingTime("30 seconds"))
    .start()

Writing Data

import org.apache.spark.sql.streaming.Trigger

val query =
  streamingInputDF
    .writeStream
    .format("csv")
    .outputMode("append")
    .option("path", "/FileStore/tables/retail-logs-data")
    .option("checkpointLocation", "/FileStore/tables/retail-logs-check")
    .trigger(Trigger.ProcessingTime("30 seconds"))
    .start()

Streaming Data Processing using Data Frame APIs

Let us understand how we can perform streaming data processing using Spark Structured Streaming Data Frames APIs.

  • There are 3 output modes in Spark Structured Streaming
    • complete
    • update
    • append
  • Depending up on the type of operations we want to perform in Spark Structured Streaming we need to use the output mode.
Operations Sink Types Output Modes
filter files append
filter console append, update
filter memory append
filter foreach (db) append, update
aggregations files append (only to unique directories)
aggregations console append, update
aggregations memory append
aggregations foreach (db) append, update, complete

In our case we are trying to write the data into Files and under one directory and hence we can only apply filter.

  • We will read the data in streaming fashion.
  • Apply filter to filter out end points with department
  • Extract visit_time and department_name from the filtered data.
  • Write the output to the target directory. As we have already used these directories, let us first clean up both data and check point folders.
%fs rm /FileStore/tables/retail-logs-data -r
%fs rm /FileStore/tables/retail-logs-check -r

Reading Data

import org.apache.spark.sql.functions.col
 
var streamingInputDF = 
  spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "ec2-3-234-136-14.compute-1.amazonaws.com:9092")
    .option("subscribe", "retail")     
    .option("startingOffsets", "latest")  
    .option("minPartitions", "2")  
    .option("failOnDataLoss", "true")
    .load()
    .select(col("value").cast("string"))
    .as[(String)]

Processing and Writing Data

import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.functions.{col, to_timestamp, split}

val query =
  streamingInputDF
    .filter("split(split(value, ' ')[6], '/')[1] = 'department'")
    .select(to_timestamp(split(col("value"), " ")(3), "[dd/MMM/YYYY:HH:mm:ss").alias("visit_time"), 
       split(split(col("value"), " ")(6), "/")(2).alias("department_name"))
    .writeStream
    .format("csv")
    .outputMode("append")
    .option("path", "/FileStore/tables/retail-logs-data")
    .option("checkpointLocation", "/FileStore/tables/retail-logs-check")
    .trigger(Trigger.ProcessingTime("30 seconds"))
    .start()

Perform Data Analysis using Spark SQL

As the data is being loaded into the folder, we can now create external table and develop queries to analyze the data further.

  • Create external table
CREATE EXTERNAL TABLE retail_logs_data (
  visit_time STRING,
  department_name STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/FileStore/tables/retail-logs-data';
  • Queries to analyze the data
SELECT department_name, count(1)
FROM retail_logs_data
GROUP BY department_name;

SELECT date_trunc('MINUTE', visit_time) visit_minute,
  department_name, 
  count(1)
FROM retail_logs_data
GROUP BY visit_minute, department_name;

SELECT date_trunc('HOUR', visit_time) visit_hour,
  department_name, 
  count(1)
FROM retail_logs_data
GROUP BY visit_hour, department_name;

SELECT date_trunc('DAY', visit_time) visit_day,
  department_name, 
  count(1)
FROM retail_logs_data
GROUP BY visit_day, department_name;

Learn Spark 1.6.x or Spark 2.x on our state of the art big data labs

  • Click here for access to state of the art 13 node Hadoop and Spark Cluster