Exercise 01 - Get monthly crime count by type



thank you @sneha_ananthan
I follow the videos as well.

The thing i don’t understand is what he is referring to: is there a specific video about this?


In the playlist you will find separate videos on saving files in different formats. He is referring to those.


I have a basic question.

As per the problem we have- “Get monthly count of primary crime type, sorted by month in ascending”.
Just going by the above, one might sort just by MM. But looking at the output format, we take the key as YYYYMM.

Do we get problem description the same way in the exam too?


Hi @panamare/@sneha_ananthan,
I just checked your message. what I meant was that you don’t have to worry about column headers. I followed the video series by Durga sir https://www.youtube.com/watch?v=UeOwsKLTwek&list=PLf0swTFhTI8q0x0V1E6We5zBQ9UazHFY0

Just take care of the file formats as mentioned in the aforementioned series (such as - JSON, AVRO etc) but you don’t have to save the column headers. Ex - If you are asked to save output in PARQUET then just save the output of “dataFrame.write.parquet()” at the desired path and that’s it. Nothing specific required for column headers.
Hope, am able to help you out with your concern.


The way you you split the date is nice



Get monthly crime count by type:

hdfs dfs -du -s -h /public/crime/csv

hdfs dfs -tail /public/crime/csv/crime_data.csv

access the Resource mgr. UI:


#Launch pyspark with optimal resources

hdfs dfs -du -s -h /public/crime/csv

#no dfs here:
hdfs fsck -blocks -files -locations /public/crime/csv

pyspark --master yarn
–conf spark.ui.port=42136
–num-executors 6
–executor-cores 2
–executor-memory 3G

#read data from hdfs into RDD
crimeData = sc.textFile("/public/crime/csv")
for i in crimeData.take(10): print(i)

#filter out the header
crimeDataHeader = crimeData.first()
crimeDataNoHeader = crimeData.filter(lambda x: x != crimeDataHeader)
for i in crimeDataNoHeader.take(10):print(i)

#convert each element into tuple ((month, crime_type), 1)
def getCrimeDetails(x):
data = x.split(",")
dateWithTime = data[2]
date = dateWithTime.split(" “)[0]
month = int(date.split(”/")[2]+date.split("/")[0])
crimeType = str(data[5])
return((month, crimeType), 1)

crimeDataMap = crimeDataNoHeader.
map(lambda x: getCrimeDetails(x))

for i in crimeDataMap.take(10): print(i)

#perform aggregation to get ((month, crime_type), count)
monthlyCrimeCountByType = crimeDataMap.
reduceByKey(lambda x,y: x+y)

for i in monthlyCrimeCountByType.take(10): print(i)

#convert each element into
#((month, -count), month + “\t” + count + “\t” + crime_type)
monthlyCrimeCountByTypeMap = monthlyCrimeCountByType.
map(lambda x: ((x[0][0], -x[1]), str(x[0][0]) + “\t” + str(x[1]) + “\t” + x[0][1]))

for i in monthlyCrimeCountByTypeMap.take(10): print(i)

#Sort the data
monthlyCrimeCountByTypeSorted = monthlyCrimeCountByTypeMap.sortByKey()

for i in monthlyCrimeCountByTypeSorted.take(10): print(i)

#discard the key and get the values
monthlyCrimeCountValues = monthlyCrimeCountByTypeSorted.
map(lambda x: x[1])

for i in monthlyCrimeCountValues.take(10): print(i)

#save the data in hdfs
#save as one or 2 files, use coalesce
saveAsTextFile("/user/rjshekar90/solutions/solution01/crimes_by_type_by_month", compressionCodecClass=“org.apache.hadoop.io.compress.GzipCodec”)


use this:

for i in sc.textFile("/user/rjshekar90/solutions/solution01/crimes_by_type_by_month").take(10):

don’t run this as it shows garbled chars

have to copy back to lfs and unzip the files

hdfs dfs -ls /user/rjshekar90/solutions/solution01/crimes_by_type_by_month


why are we using substring(date, 0, 2) in this question to extract the month? I believe the index position starts from 1 and not 0?


Using spark - shell

val crimefile = sc.textFile("/user/test/crimes-2018.csv")
val first = crimefile.first

val crimes = crimefile.filter(x=>x!=first)
val crimmap = crimes.map(x=>{
val splitted = x.split(",")
val monthsplit = splitted(2).trim.split("/")
val month = monthsplit(2).split(" ")(0)+monthsplit(0)
val ctype = splitted(5).trim

val cdf = crimmap.toDF(“month”,“ctype”,“cnt”)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val result = sqlContext.sql(“select month,sum(cnt) as total,ctype from crimes group by month,ctype order by month asc,total desc”)

val crimerdd = result.rdd.map(x=>{
val month = x.getString(0)
val total = x.getLong(1)
val ctype = x.getString(2)



Do you find any error in the code? My output is different than what is shown in the video

sqlContext.sql("select cast(concat(substr(date,7,4), substr(date,0,2)) as int) crime_date,crime_type,count(1) count_crime_type " +
"from crim group by cast(concat(substr(date,7,4), substr(date,0,2)) as int), crime_type " +
“order by crime_date,count_crime_type desc”).show

I’m getting this output:

|crime_date| crime_type|count_crime_type|
| 200101| SIMPLE| 7061|
| 200101| $500 AND UNDER| 5125|
| 200101| OVER $500| 2401|
| 200101| TO VEHICLE| 2017|
| 200101| TO PROPERTY| 1952|
| 200101| AUTOMOBILE| 1518|
| 200101| POSS: CRACK| 1460|
| 200101| FORCIBLE ENTRY| 1452|
| 200101|POSS: CANNABIS 30…| 1190|
| 200101| TELEPHONE THREAT| 979|
| 200101|HARASSMENT BY TEL…| 919|
| 200101| POSS: HEROIN(WHITE)| 876|


I quite appreciate this platform a lot. However for someone like me who is not expert yet in coding, is there available solutions to these practispce questions to I can compare to mine. In python please.

With great regards

Thank you


Comparing python and Scala languages, can someone sincerely prescribe which is easier and simpler for the CCA 175 exams please?
Your sincerity will be highly appreciated. Thank you


get the file in hdfs
hadoop fs -copyFromLocal /data/crime /user/user_id/crime_test

read hdfs file

Filter header
crimeFileWoHeader=crimeFileRdd.filter(lambda i:i.split(",")[0]!=‘ID’)

covert to dataframe
from pyspark import Row
crimeFileDf=crimeFileWoHeader.map(lambda i:Row(crime_date=i.split(",")[2].split(" “)[0].split(”/")[2]+i.split(",")[2].split(" “)[0].split(”/")[0],crime_type=i.split(",")[5])).toDF()

register as temp table

use sql query to get result
crimeResultSet=sqlContext.sql(“select crime_date,crime_type,count(*) as count from crimeTable group by crime_date,crime_type order by crime_date,count desc”)

Save dataframe into RDD
crimeResultRDD=crimeResultSet.rdd.map(lambda i: str(i[0])+’\t’+str(i[2])+’\t’+i[1]).coalesce(1)

save the file to hdfs
crimeResultRDD.saveAsTextFile("/user/user_id/solutions/solution01/crimes_by_type_by_month", compressionCodecClass=“org.apache.hadoop.io.compress.GzipCodec”)

Core API
get the file in hdfs
hadoop fs -copyFromLocal /data/crime /user/user_id/crime_test

read hdfs file

Filter header
crimeFileWoHeader=crimeFileRdd.filter(lambda i:i.split(",")[0]!=‘ID’)

covert to desired format
crimeFile=crimeFileWoHeader.map(lambda i:((int(i.split(",")[2].split(" “)[0].split(”/")[2]+i.split(",")[2].split(" “)[0].split(”/")[0]),i.split(",")[5]),1))

count by key to get crime counts per month
from operator import add

reformat data to sort by month and count
crimeFileMap=crimeFileCount.map(lambda i:((int(i[0][0]),-int(i[1])),str(i[0][0])+"\t"+str(i[1])+"\t"+i[0][1]))

sot by key

Final file as per required format
finalCrimeFile=crimeSortedFile.map(lambda i:i[1]).coalesce(1)

save the file to hdfs
finalCrimeFile.saveAsTextFile("/user/user_id/solutions1/solution01/crimes_by_type_by_month", compressionCodecClass=“org.apache.hadoop.io.compress.GzipCodec”)



crime_filter_header=crime_rdd.filter(lambda x:x.split(",")[0]!=‘ID’)

def date_conv(datewithts):
dt1=date.split(" “)

#Invoke function to convert date format

crime_date_convert=crime_filter_header.map(lambda rec:(date_conv(rec),rec.split(",")[5]))
crime_tuple=crime_date_convert.map(lambda x:(x,1))
crime_count=crime_tuple.reduceByKey(lambda x,y:x+y)
crimesortemp=crime_count.map(lambda x:((x[0][0],-x[1]), str(x[0][0]) + “\t” + str(x[1]) + “\t” + x[0][1]))
CrimeSortMap=CrimeSort.map(lambda x:x[1])
CrimeSortMap.saveAsTextFile("/user/cloudera/solutions/solution01/crimes_by_type_by_month", compressionCodecClass=“org.apache.hadoop.io.compress.GzipCodec”)



crime_record_with_no_hdr=crime_rdd.filter(lambda x:x.split(",")[0]!=‘ID’)

Extract only date and crime type , and convert to DataFrame

from pyspark import Row
crimeDF=crime_record_with_no_hdr.map(lambda x : Row(crime_date=x.split(",")[2],crime_type=x.split(",")[5])).toDF()
format_date=sqlContext.sql(“select concat(substr(crime_date,7,4),substr(crime_date,1,2)) as crime_date_formatted, crime_type from crime”)
final_crime_output=sqlContext.sql(“select crime_date_formatted, count(*) as crime_count, crime_type from crime_new group by crime_date_formatted, crime_type order by crime_date_formatted asc, crime_count desc”)
CrimeResultRDD=final_crime_output.rdd.map(lambda x: str(x[0])+"\t"+str(x[1])+"\t"+str(x[2]))
CrimeResultRDD.coalesce(2).saveAsTextFile("/user/cloudera/solutions/solution01/crimes_by_type_by_month", compressionCodecClass=“org.apache.hadoop.io.compress.GzipCodec”)