Exercise 03 - Get top 3 crime types based on number of incidents in RESIDENCE area

spark-shell
python
scala
apache-spark
pyspark
#23

This did it for me…

crimesRDD=sc.textFile(’/public/crime/csv’)
crimesRDDhead=crimesRDD.first()
crimesNoHeadRDD=crimesRDD.filter(lambda rec: rec != crimesRDDhead )
import re
exp=re.compile(",(?=(?:[^"]"[^"]")[^"]$)")
crimesFiltered=crimesNoHeadRDD.filter(lambda rec: exp.split(rec)[7]==‘RESIDENCE’)
crimesFilteredmap=crimesFiltered.map(lambda rec: (exp.split(rec)[5],1))

===> you can go sprak core or spark sql from here----

SPARK CORE:
crimesAgg=crimesFilteredmap.reduceByKey(lambda x,y: x+y)
crimesAggsort=crimesAgg.map(lambda rec: (int(rec[1]) -1,rec[0])).sortByKey().map(lambda rec: (rec[1],rec[0]-1))
crimetyptop3=sc.parallelize(crimesAggsort.takeOrdered(3,key=lambda rec: rec[1]*-1))
from pyspark.sql import Row
crimetyptop3DF=crimetyptop3.map(lambda rec: Row(crime_type=rec[0],num_of_incidents=rec[1])).toDF()
crimetyptop3DF.toJSON().repartition(1).saveAsTextFile(’/tmp/problem1’)

SPARK SQL:
from pyspark.sql import Row
crimesFilteredmap.map(lambda rec: Row(crime_type=rec[0],incident=rec[1])).toDF()
crimesDF=crimesFilteredmap.map(lambda rec: Row(crime_type=rec[0],incident=rec[1])).toDF()
crimesDF.registerTempTable(‘crimesTab’)
sqlContext.sql(‘select crime_type, count(1) num_of_incidents from crimesTab group by crime_type order by num_of_incidents desc limit 3’).
write.save(’/tmp/problem3json’, format=“json”)

0 Likes

#24

Hi,

Can you please share the output for problem #3? Is my approach and output correct ?

crime= spark.read.format(“csv”).option(“header”,“true”).option(“inferSchema”,“true”).option(‘quote’,’"’).\

… load("/public/crime/csv/crime_data.csv")

crimeFil=crime.filter("Location Description==‘RESIDENCE’")

crimeFil.registerTempTable(“crimeT”)

spark.sql(“select Primary Type,count() Number of Incidents from crimeT group by Primary Type order by count() desc limit 3”).show()

±------------±------------------+

| Primary Type|Number of Incidents|

±------------±------------------+

| BATTERY| 244394|

|OTHER OFFENSE| 184667|

| THEFT| 142273|

±------------±------------------+

0 Likes

#25

My results are same as yours. Per the problem, output has to be in JSON. This is how I am doing it.

// RDD[(String, Int)]
// (String, Int) = (MOTOR VEHICLE THEFT,2612)
val crimeCountsRdd = sc.
textFile("/public/crime/csv").
filter(rec => rec.split(",(?=(?:[^"]"[^"]")[^"]$)", -1)(7).equals(“RESIDENCE”)).
map(rec => {
val arr = rec.split(",(?=(?:[^"]"[^"]")[^"]$)", -1)
val crimeType = arr(5)
(crimeType, 1)
}).
reduceByKey((tot,elem) => tot+elem)
val crimesSortedRdd = crimeCountsRdd.sortBy(rec => -rec._2)
// take on an RDD returns an array
// Array((BATTERY,244394), (OTHER OFFENSE,184667), (THEFT,142273))
val top3Arr = crimesSortedRdd.take(3)
// convert array back to RDD before you can write to HDFS
val top3Rdd = sc.parallelize(top3Arr)
// convert RDD to dataframe so that we can use the toJSON function
top3Rdd.
toDF(“crime_type”, “num_incidents”).
toJSON. // here it becomes an RDD again
coalesce(1). // we have just 3 records. need to coalesce(1) to write in 1 file
saveAsTextFile("/user/devfactor/solutions/solution03/RESIDENCE_AREA_CRIMINAL_TYPE_DATA")

Output is:
{“crime_type”:“BATTERY”,“num_incidents”:244394}
{“crime_type”:“OTHER OFFENSE”,“num_incidents”:184667}
{“crime_type”:“THEFT”,“num_incidents”:142273}

0 Likes