Exercise 01 - Get monthly crime count by type

pyspark
spark-shell
apache-spark
scala
python
Exercise 01 - Get monthly crime count by type
0.0 0

#24

thank you @sneha_ananthan
I follow the videos as well.

The thing i don’t understand is what he is referring to: is there a specific video about this?


#25

In the playlist you will find separate videos on saving files in different formats. He is referring to those.


#26

I have a basic question.

As per the problem we have- “Get monthly count of primary crime type, sorted by month in ascending”.
Just going by the above, one might sort just by MM. But looking at the output format, we take the key as YYYYMM.

Do we get problem description the same way in the exam too?


#27

Hi @panamare/@sneha_ananthan,
I just checked your message. what I meant was that you don’t have to worry about column headers. I followed the video series by Durga sir https://www.youtube.com/watch?v=UeOwsKLTwek&list=PLf0swTFhTI8q0x0V1E6We5zBQ9UazHFY0

Just take care of the file formats as mentioned in the aforementioned series (such as - JSON, AVRO etc) but you don’t have to save the column headers. Ex - If you are asked to save output in PARQUET then just save the output of “dataFrame.write.parquet()” at the desired path and that’s it. Nothing specific required for column headers.
Hope, am able to help you out with your concern.
Thanks.


#28

The way you you split the date is nice


#29

#python
1:

Get monthly crime count by type:

hdfs dfs -du -s -h /public/crime/csv

hdfs dfs -tail /public/crime/csv/crime_data.csv

access the Resource mgr. UI:

http://rm01.itversity.com:19288/cluster

#Launch pyspark with optimal resources

hdfs dfs -du -s -h /public/crime/csv

#no dfs here:
hdfs fsck -blocks -files -locations /public/crime/csv

pyspark --master yarn
–conf spark.ui.port=42136
–num-executors 6
–executor-cores 2
–executor-memory 3G

#read data from hdfs into RDD
crimeData = sc.textFile("/public/crime/csv")
for i in crimeData.take(10): print(i)

#filter out the header
crimeDataHeader = crimeData.first()
crimeDataHeader
crimeDataNoHeader = crimeData.filter(lambda x: x != crimeDataHeader)
for i in crimeDataNoHeader.take(10):print(i)

#convert each element into tuple ((month, crime_type), 1)
def getCrimeDetails(x):
data = x.split(",")
dateWithTime = data[2]
date = dateWithTime.split(" “)[0]
month = int(date.split(”/")[2]+date.split("/")[0])
crimeType = str(data[5])
return((month, crimeType), 1)

crimeDataMap = crimeDataNoHeader.
map(lambda x: getCrimeDetails(x))

for i in crimeDataMap.take(10): print(i)

#perform aggregation to get ((month, crime_type), count)
monthlyCrimeCountByType = crimeDataMap.
reduceByKey(lambda x,y: x+y)

for i in monthlyCrimeCountByType.take(10): print(i)

#convert each element into
#((month, -count), month + “\t” + count + “\t” + crime_type)
monthlyCrimeCountByTypeMap = monthlyCrimeCountByType.
map(lambda x: ((x[0][0], -x[1]), str(x[0][0]) + “\t” + str(x[1]) + “\t” + x[0][1]))

for i in monthlyCrimeCountByTypeMap.take(10): print(i)

#Sort the data
monthlyCrimeCountByTypeSorted = monthlyCrimeCountByTypeMap.sortByKey()

for i in monthlyCrimeCountByTypeSorted.take(10): print(i)

#discard the key and get the values
monthlyCrimeCountValues = monthlyCrimeCountByTypeSorted.
map(lambda x: x[1])

for i in monthlyCrimeCountValues.take(10): print(i)

#save the data in hdfs
#save as one or 2 files, use coalesce
monthlyCrimeCountValues.coalesce(2).
saveAsTextFile("/user/rjshekar90/solutions/solution01/crimes_by_type_by_month", compressionCodecClass=“org.apache.hadoop.io.compress.GzipCodec”)

#validation

use this:

for i in sc.textFile("/user/rjshekar90/solutions/solution01/crimes_by_type_by_month").take(10):
print(i)

don’t run this as it shows garbled chars

have to copy back to lfs and unzip the files

hdfs dfs -ls /user/rjshekar90/solutions/solution01/crimes_by_type_by_month