Exercise 03 - Get top 3 crime types based on number of incidents in RESIDENCE area

pyspark
apache-spark
spark-shell
scala
python

#23

This did it for me…

crimesRDD=sc.textFile(’/public/crime/csv’)
crimesRDDhead=crimesRDD.first()
crimesNoHeadRDD=crimesRDD.filter(lambda rec: rec != crimesRDDhead )
import re
exp=re.compile(",(?=(?:[^"]"[^"]")[^"]$)")
crimesFiltered=crimesNoHeadRDD.filter(lambda rec: exp.split(rec)[7]==‘RESIDENCE’)
crimesFilteredmap=crimesFiltered.map(lambda rec: (exp.split(rec)[5],1))

===> you can go sprak core or spark sql from here----

SPARK CORE:
crimesAgg=crimesFilteredmap.reduceByKey(lambda x,y: x+y)
crimesAggsort=crimesAgg.map(lambda rec: (int(rec[1]) -1,rec[0])).sortByKey().map(lambda rec: (rec[1],rec[0]-1))
crimetyptop3=sc.parallelize(crimesAggsort.takeOrdered(3,key=lambda rec: rec[1]*-1))
from pyspark.sql import Row
crimetyptop3DF=crimetyptop3.map(lambda rec: Row(crime_type=rec[0],num_of_incidents=rec[1])).toDF()
crimetyptop3DF.toJSON().repartition(1).saveAsTextFile(’/tmp/problem1’)

SPARK SQL:
from pyspark.sql import Row
crimesFilteredmap.map(lambda rec: Row(crime_type=rec[0],incident=rec[1])).toDF()
crimesDF=crimesFilteredmap.map(lambda rec: Row(crime_type=rec[0],incident=rec[1])).toDF()
crimesDF.registerTempTable(‘crimesTab’)
sqlContext.sql(‘select crime_type, count(1) num_of_incidents from crimesTab group by crime_type order by num_of_incidents desc limit 3’).
write.save(’/tmp/problem3json’, format=“json”)