Exercise 03 - Get top 3 crime types based on number of incidents in RESIDENCE area

pyspark
spark-shell
apache-spark
scala
python

#1

Before attempting these questions, make sure you prepare by going through appropriate material.


Here are the Udemy coupons for our certification courses. Our coupons include 1 month lab access as well.

  • Click here for $35 coupon for CCA 175 Spark and Hadoop Developer using Python.
  • Click here for $35 coupon for CCA 175 Spark and Hadoop Developer using Scala.
  • Click here for $25 coupon for HDPCD:Spark using Python.
  • Click here for $25 coupon for HDPCD:Spark using Scala.
  • Click here for access to state of the art 13 node Hadoop and Spark Cluster

  • Details - Duration 15 to 20 minutes
    • Data is available in HDFS file system under /public/crime/csv
    • Structure of data (ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location)
      File format - text file
    • Delimiter - “,” (use regex while splitting split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)", -1), as there are some fields with comma and enclosed using double quotes.
    • Get top 3 crime types based on number of incidents in RESIDENCE area using “Location Description”
    • Store the result in HDFS path /user/<YOUR_USER_ID>/solutions/solution03/RESIDENCE_AREA_CRIMINAL_TYPE_DATA
    • Output Fields: Crime Type, Number of Incidents
    • Output File Format: JSON
    • Output Delimiter: N/A
    • Output Compression: No
  • Validation
  • Solution

#4

In Scala using Data Frames

val cData = sc.textFile("/public/crime/csv").map(r => { var d = r.split(",(?=(?:[^"]"[^"]")[^"]$)", -1); (d(0),d(5),d(7))} )

case class crimeCC (
id : String,
ctype: String,
loc_desc: String
)

val crimeDF = cData.map(r => crimeCC(r._1, r._2, r._3 )).toDF

Spark SQL

crimeDF.registerTempTable(“crimetable”)

val resultSQL = sqlContext.sql("select ctype Crime_Type, count(id) number_of_incident from crimetable where loc_desc = ‘RESIDENCE’ group by ctype order by number_of_incident desc limit 3 ")

resultSQL.toJSON.repartition(1).saveAsTextFile("/user/cloudera/solutions/solution03/RESIDENCE_AREA_CRIMINAL_TYPE_DATA")

Spark DataFrame

val resultDF = crimeDF.filter(“loc_desc = ‘RESIDENCE’” ).groupBy(col(“ctype”).alias(“Crime_Type”)).agg(count(col(“id”)).alias(“number_of_incident”)).orderBy(col(“number_of_incident”) desc).limit(3)

resultDF.toJSON.repartition(1).saveAsTextFile("/user/cloudera/solutions/solution03/RESIDENCE_AREA_CRIMINAL_TYPE_DATA")


#5

val crimeDataRDD = sc.textFile("/public/crime/csv").
mapPartitionsWithIndex((idx, lines) => {
if(idx == 0) lines.drop(1) else lines }
)

val crimeRDD = crimeDataRDD.filter(.split(",(?=(?:[^"]"[^"]")[^"]$)", -1)(7) == “RESIDENCE”).
map(cd => {
val data = cd.split(",(?=(?:[^"]"[^"]")[^"]$)", -1)(5)
(data, 1)
}). reduceByKey((
+_))

val crimeDF = crimeRDD.toDF(“crime_type”, “crime_count”)

val top3crimeTypesDF = crimeDF.orderBy($“crime_count”.desc).limit(3)

top3CrimeTypes.coalesce(1).write.json(“target_dir/RESIDENCE_AREA_CRIMINAL_TYPE_DATA”)


#6

val crimes = sc.textFile(“file:///home/cloudera/workspace/scala/rajudata/data-master/Crimes_-_2018.csv”)

val header = crimes.first

val crimeFilter = crimes.filter(row => row!=header)

val crimeMappedDF = crimeFilter.map(ele => {
val splitted = ele.split(",(?=(?:[^"]"[^"]")[^"]$)", -1)
val primeType = splitted(5)
val location = splitted(7)
(primeType, location)
}).filter(ele => ele._2 == “RESIDENCE”).toDF(“primeType”, “location”)

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

crimeMappedDF.registerTempTable(“crimes”)

val toprecords = sqlContext.sql(“select primeType, count(1) as crimecount from crimes group by primeType order by crimecount desc limit 3”)

toprecords.save("/user/dgadiraju/solutions/solution03/RESIDENCE_AREA_CRIMINAL_TYPE_DATA", “json”)


#7

This solution does not take into consideration of more than one crime type having some count.
Ideally the count values should be loaded into set (to remove dups)… then converted to List… then sorted in descending order. Then take(3).min
Once you get the above number then do takewhile or filter using val>=top3Min
result may have more than 3 crime types if any of these crimetypes have same count.

Please clarify and confirm. Thanks


#8

Does below result looks correct. No one posted so just want to confirm.

{“crime_type”:“BATTERY”,“total_incidents”:244394}
{“crime_type”:“OTHER OFFENSE”,“total_incidents”:184667}
{“crime_type”:“THEFT”,“total_incidents”:142273}


#9

Developed following PySpark code
data = sc.textFile("/public/crime/csv/crime_data.csv")
header = data.first()
data_filter = data.filter(lambda x: x!=header).map(lambda x:str(x))
data_map = data.map(lambda x:(x.split(",")[5], x.split(",")[7]))
data_map_filter = data_map.filter(lambda x: x[7]==‘RESIDENCE’).map(lambda x:(x[0],1))
result = data_map_filter.reduceByKey(lambda x,y:x+y)
result_sort = result.takeOrdered(5,key=lambda x:-x[1])
data_df = sc.parallelize(result_sort).toDF([“crime_type”,“num_rec”])
data_df.coalesce(1).write.json("/user/sravyakalla/solution/solution3")

I did not use regular expression while splitting the data as I am receiving index out of range error.
I used regex in following way:
data_map = data.map(lambda x:(x.split(",(?=(?:[^"]"[^"]")[^"]$)", -1)[5], x.split(",(?=(?:[^"]"[^"]")[^"]$)", -1)[7]).

I understand that it is not splitting properly. Please provide me insight on how to implement the split with regex.


#10

I got the same answer.


#11

I just used split(",") and it still worked. When I used regex, I got exceptions

crimeRaw = sc.textFile("/public/crime/csv")
exclude = crimeRaw.first()
crimeRawFiltered = crimeRaw.filter(lambda x : (x != exclude))
crimeResidence = crimeRawFiltered.filter(lambda x : x.split(",")[7]==“RESIDENCE”)
crimeMap = crimeResidence.map(lambda x : (x.split(",")[5],1))
crimeMapTotal = crimeMap.reduceByKey(lambda x,y : x+y)
output = crimeMapTotal.takeOrdered(3, key=lambda x : -x[1])
df = sc.parallelize(output).toDF([“Crime Type”,“Number Of Incidents”])
df.coalesce(1).write.json("")

Output:
{“Crime Type”:“BATTERY”,“Number Of Incidents”:244394}
{“Crime Type”:“OTHER OFFENSE”,“Number Of Incidents”:184667}
{“Crime Type”:“THEFT”,“Number Of Incidents”:142273}


#12

@itversity

please provide the solution in pyspark on how we have to use the regex.

Thank you very much


#13

Sure. @kmln will take care of it very soon.


#14

This should work in pyspark for the regular expressions:

crimeRaw = sc.textFile('/public/crime/csv')

header = crimeRaw.first()

crime = crimeRaw.filter(lambda c: c != header)

from pyspark.sql import *
from pyspark.sql.functions import *
import re


regex = re.compile(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)")

crime_DF = crime. \
map(lambda c:
	Row(crime_type=regex.split(c)[5], 
		location_desc=regex.split(c)[7]
	)
).toDF()

top3CrimesByType_DF = crime_DF. \
filter("location_desc like 'RESIDENCE%'"). \
groupBy('crime_type'). \
agg(count('crime_type').alias('num_incidents')). \
orderBy(desc('num_incidents')).limit(3)

top3CrimesByType_DF.show()

top3CrimesByType_DF.rdd.toDF(schema=['Crime Type', 'Number of Incidents']). \
write.json('/user/snehaananthan/solutions/solution03/RESIDENCE_AREA_CRIMINAL_TYPE_DATA')

#15

wont ‘RESIDENCE%’ consider records for both values RESIDENCE & RESIDENCE-GARAGE for the crime_type count?
question asks for RESIDENCE. please correct my understanding


#16

Hi, the answer is not, If you try with that filter transformation will get nothing.