Exercise 03 - Get top 3 crime types based on number of incidents in RESIDENCE area

Before attempting these questions, make sure you prepare by going through appropriate material.

Here are the Udemy coupons for our certification courses. Our coupons include 1 month lab access as well.

  • Click here for $35 coupon for CCA 175 Spark and Hadoop Developer using Python.
  • Click here for $35 coupon for CCA 175 Spark and Hadoop Developer using Scala.
  • Click here for $25 coupon for HDPCD:Spark using Python.
  • Click here for $25 coupon for HDPCD:Spark using Scala.
  • Click here for access to state of the art 13 node Hadoop and Spark Cluster

  • Details - Duration 15 to 20 minutes
    • Data is available in HDFS file system under /public/crime/csv
    • Structure of data (ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location)
      File format - text file
    • Delimiter - “,” (use regex while splitting split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)", -1), as there are some fields with comma and enclosed using double quotes.
    • Get top 3 crime types based on number of incidents in RESIDENCE area using “Location Description”
    • Store the result in HDFS path /user/<YOUR_USER_ID>/solutions/solution03/RESIDENCE_AREA_CRIMINAL_TYPE_DATA
    • Output Fields: Crime Type, Number of Incidents
    • Output File Format: JSON
    • Output Delimiter: N/A
    • Output Compression: No
  • Validation
  • Solution

In Scala using Data Frames

val cData = sc.textFile("/public/crime/csv").map(r => { var d = r.split(",(?=(?:[^"]"[^"]")[^"]$)", -1); (d(0),d(5),d(7))} )

case class crimeCC (
id : String,
ctype: String,
loc_desc: String

val crimeDF = cData.map(r => crimeCC(r._1, r._2, r._3 )).toDF

Spark SQL


val resultSQL = sqlContext.sql("select ctype Crime_Type, count(id) number_of_incident from crimetable where loc_desc = ‘RESIDENCE’ group by ctype order by number_of_incident desc limit 3 ")


Spark DataFrame

val resultDF = crimeDF.filter(“loc_desc = ‘RESIDENCE’” ).groupBy(col(“ctype”).alias(“Crime_Type”)).agg(count(col(“id”)).alias(“number_of_incident”)).orderBy(col(“number_of_incident”) desc).limit(3)



val crimeDataRDD = sc.textFile("/public/crime/csv").
mapPartitionsWithIndex((idx, lines) => {
if(idx == 0) lines.drop(1) else lines }

val crimeRDD = crimeDataRDD.filter(.split(",(?=(?:[^"]"[^"]")[^"]$)", -1)(7) == “RESIDENCE”).
map(cd => {
val data = cd.split(",(?=(?:[^"]"[^"]")[^"]$)", -1)(5)
(data, 1)
}). reduceByKey((

val crimeDF = crimeRDD.toDF(“crime_type”, “crime_count”)

val top3crimeTypesDF = crimeDF.orderBy($“crime_count”.desc).limit(3)


1 Like

val crimes = sc.textFile(“file:///home/cloudera/workspace/scala/rajudata/data-master/Crimes_-_2018.csv”)

val header = crimes.first

val crimeFilter = crimes.filter(row => row!=header)

val crimeMappedDF = crimeFilter.map(ele => {
val splitted = ele.split(",(?=(?:[^"]"[^"]")[^"]$)", -1)
val primeType = splitted(5)
val location = splitted(7)
(primeType, location)
}).filter(ele => ele._2 == “RESIDENCE”).toDF(“primeType”, “location”)

val sqlContext = new org.apache.spark.sql.SQLContext(sc)


val toprecords = sqlContext.sql(“select primeType, count(1) as crimecount from crimes group by primeType order by crimecount desc limit 3”)

toprecords.save("/user/dgadiraju/solutions/solution03/RESIDENCE_AREA_CRIMINAL_TYPE_DATA", “json”)

This solution does not take into consideration of more than one crime type having some count.
Ideally the count values should be loaded into set (to remove dups)… then converted to List… then sorted in descending order. Then take(3).min
Once you get the above number then do takewhile or filter using val>=top3Min
result may have more than 3 crime types if any of these crimetypes have same count.

Please clarify and confirm. Thanks

Does below result looks correct. No one posted so just want to confirm.

{“crime_type”:“OTHER OFFENSE”,“total_incidents”:184667}

Developed following PySpark code
data = sc.textFile("/public/crime/csv/crime_data.csv")
header = data.first()
data_filter = data.filter(lambda x: x!=header).map(lambda x:str(x))
data_map = data.map(lambda x:(x.split(",")[5], x.split(",")[7]))
data_map_filter = data_map.filter(lambda x: x[7]==‘RESIDENCE’).map(lambda x:(x[0],1))
result = data_map_filter.reduceByKey(lambda x,y:x+y)
result_sort = result.takeOrdered(5,key=lambda x:-x[1])
data_df = sc.parallelize(result_sort).toDF([“crime_type”,“num_rec”])

I did not use regular expression while splitting the data as I am receiving index out of range error.
I used regex in following way:
data_map = data.map(lambda x:(x.split(",(?=(?:[^"]"[^"]")[^"]$)", -1)[5], x.split(",(?=(?:[^"]"[^"]")[^"]$)", -1)[7]).

I understand that it is not splitting properly. Please provide me insight on how to implement the split with regex.

1 Like

I got the same answer.

I just used split(",") and it still worked. When I used regex, I got exceptions

crimeRaw = sc.textFile("/public/crime/csv")
exclude = crimeRaw.first()
crimeRawFiltered = crimeRaw.filter(lambda x : (x != exclude))
crimeResidence = crimeRawFiltered.filter(lambda x : x.split(",")[7]==“RESIDENCE”)
crimeMap = crimeResidence.map(lambda x : (x.split(",")[5],1))
crimeMapTotal = crimeMap.reduceByKey(lambda x,y : x+y)
output = crimeMapTotal.takeOrdered(3, key=lambda x : -x[1])
df = sc.parallelize(output).toDF([“Crime Type”,“Number Of Incidents”])

{“Crime Type”:“BATTERY”,“Number Of Incidents”:244394}
{“Crime Type”:“OTHER OFFENSE”,“Number Of Incidents”:184667}
{“Crime Type”:“THEFT”,“Number Of Incidents”:142273}

1 Like


please provide the solution in pyspark on how we have to use the regex.

Thank you very much

Sure. @kmln will take care of it very soon.

This should work in pyspark for the regular expressions:

crimeRaw = sc.textFile('/public/crime/csv')

header = crimeRaw.first()

crime = crimeRaw.filter(lambda c: c != header)

from pyspark.sql import *
from pyspark.sql.functions import *
import re

regex = re.compile(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)")

crime_DF = crime. \
map(lambda c:

top3CrimesByType_DF = crime_DF. \
filter("location_desc like 'RESIDENCE%'"). \
groupBy('crime_type'). \
agg(count('crime_type').alias('num_incidents')). \


top3CrimesByType_DF.rdd.toDF(schema=['Crime Type', 'Number of Incidents']). \

wont ‘RESIDENCE%’ consider records for both values RESIDENCE & RESIDENCE-GARAGE for the crime_type count?
question asks for RESIDENCE. please correct my understanding

Hi, the answer is not, If you try with that filter transformation will get nothing.

Can someone explain why we should use the below reg ex? Also I see that split has been used as in split("", -1). What is the meaning of -1 here?

Also if if someone can explain the use of the regex format (?=(?:[^"]"[^"]")[^"]$)

I got the same error. while using python, you can use reg ex with following code:

import re
crimeInRESIDENCE = crimeFilter.filter(lambda r: re.split(",(?=(?:[^"]"[^"]")[^"]$)", r)[7] == ‘RESIDENCE’)

yup. Got the same result.

Can any one please help me how to get/download the DATA(csv file).

1 Like

read file

remove header field
crimeWoH=crimeRdd.filter(lambda i: i.split(",")[0]!=‘ID’)

convert into dataframe
from pyspark import Row
crimeDf=crimeWoH.map(lambda i:Row(crime_type=i.split(",")[5],crime_loc=i.split(",")[7])).toDF()

register temp table

use query to fetch top 3 crime types
fetchedCrimes=sqlContext.sql(“select crime_type,count(*) as count from crimeTable where crime_loc=‘RESIDENCE’ group by crime_type order by count desc limit 3”).coalesce(1)

save in HDFS path in JSON format

Core API


remove header field
crimeWoH=crimeRdd.filter(lambda i:i.split(",")[0]!=“ID”)

filter crimes for RESIDENCE
crimeResidence=crimeWoH.filter(lambda i:i.split(",")[7]==“RESIDENCE”)

map as (key,value) pair
crimeKeyValue=crimeResidence.map(lambda i:(i.split(",")[5],1))

reduce by crime type
from operator import add

pick top 3
crimeTop3=crimeReduce.top(3,key=lambda i:i[1])[:3]

convert list into RDD

convert into dataframe

save to hdfs as JSON

Hi All

import re
regex = re.compile(",(?=(?:[^"]"[^"]")[^"]$)")
dataM = dataNoHeader.filter(lambda x: regex.split(x)[7] == ‘RESIDENCE’)
#for i in dataM.take(10): print(i)
This works for me