Exercise 01 - Get monthly crime count by type

pyspark
spark-shell
apache-spark
scala
python

#24

thank you @sneha_ananthan
I follow the videos as well.

The thing i don’t understand is what he is referring to: is there a specific video about this?


#25

In the playlist you will find separate videos on saving files in different formats. He is referring to those.


#26

I have a basic question.

As per the problem we have- “Get monthly count of primary crime type, sorted by month in ascending”.
Just going by the above, one might sort just by MM. But looking at the output format, we take the key as YYYYMM.

Do we get problem description the same way in the exam too?


#27

Hi @panamare/@sneha_ananthan,
I just checked your message. what I meant was that you don’t have to worry about column headers. I followed the video series by Durga sir https://www.youtube.com/watch?v=UeOwsKLTwek&list=PLf0swTFhTI8q0x0V1E6We5zBQ9UazHFY0

Just take care of the file formats as mentioned in the aforementioned series (such as - JSON, AVRO etc) but you don’t have to save the column headers. Ex - If you are asked to save output in PARQUET then just save the output of “dataFrame.write.parquet()” at the desired path and that’s it. Nothing specific required for column headers.
Hope, am able to help you out with your concern.
Thanks.


#28

The way you you split the date is nice


#29

#python
1:

Get monthly crime count by type:

hdfs dfs -du -s -h /public/crime/csv

hdfs dfs -tail /public/crime/csv/crime_data.csv

access the Resource mgr. UI:

http://rm01.itversity.com:19288/cluster

#Launch pyspark with optimal resources

hdfs dfs -du -s -h /public/crime/csv

#no dfs here:
hdfs fsck -blocks -files -locations /public/crime/csv

pyspark --master yarn
–conf spark.ui.port=42136
–num-executors 6
–executor-cores 2
–executor-memory 3G

#read data from hdfs into RDD
crimeData = sc.textFile("/public/crime/csv")
for i in crimeData.take(10): print(i)

#filter out the header
crimeDataHeader = crimeData.first()
crimeDataHeader
crimeDataNoHeader = crimeData.filter(lambda x: x != crimeDataHeader)
for i in crimeDataNoHeader.take(10):print(i)

#convert each element into tuple ((month, crime_type), 1)
def getCrimeDetails(x):
data = x.split(",")
dateWithTime = data[2]
date = dateWithTime.split(" “)[0]
month = int(date.split(”/")[2]+date.split("/")[0])
crimeType = str(data[5])
return((month, crimeType), 1)

crimeDataMap = crimeDataNoHeader.
map(lambda x: getCrimeDetails(x))

for i in crimeDataMap.take(10): print(i)

#perform aggregation to get ((month, crime_type), count)
monthlyCrimeCountByType = crimeDataMap.
reduceByKey(lambda x,y: x+y)

for i in monthlyCrimeCountByType.take(10): print(i)

#convert each element into
#((month, -count), month + “\t” + count + “\t” + crime_type)
monthlyCrimeCountByTypeMap = monthlyCrimeCountByType.
map(lambda x: ((x[0][0], -x[1]), str(x[0][0]) + “\t” + str(x[1]) + “\t” + x[0][1]))

for i in monthlyCrimeCountByTypeMap.take(10): print(i)

#Sort the data
monthlyCrimeCountByTypeSorted = monthlyCrimeCountByTypeMap.sortByKey()

for i in monthlyCrimeCountByTypeSorted.take(10): print(i)

#discard the key and get the values
monthlyCrimeCountValues = monthlyCrimeCountByTypeSorted.
map(lambda x: x[1])

for i in monthlyCrimeCountValues.take(10): print(i)

#save the data in hdfs
#save as one or 2 files, use coalesce
monthlyCrimeCountValues.coalesce(2).
saveAsTextFile("/user/rjshekar90/solutions/solution01/crimes_by_type_by_month", compressionCodecClass=“org.apache.hadoop.io.compress.GzipCodec”)

#validation

use this:

for i in sc.textFile("/user/rjshekar90/solutions/solution01/crimes_by_type_by_month").take(10):
print(i)

don’t run this as it shows garbled chars

have to copy back to lfs and unzip the files

hdfs dfs -ls /user/rjshekar90/solutions/solution01/crimes_by_type_by_month


#30

why are we using substring(date, 0, 2) in this question to extract the month? I believe the index position starts from 1 and not 0?


#31

Using spark - shell

val crimefile = sc.textFile("/user/test/crimes-2018.csv")
val first = crimefile.first

val crimes = crimefile.filter(x=>x!=first)
val crimmap = crimes.map(x=>{
val splitted = x.split(",")
val monthsplit = splitted(2).trim.split("/")
val month = monthsplit(2).split(" ")(0)+monthsplit(0)
val ctype = splitted(5).trim
(month,ctype,1)
})

val cdf = crimmap.toDF(“month”,“ctype”,“cnt”)
cdf.registerTempTable(“crimes”)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val result = sqlContext.sql(“select month,sum(cnt) as total,ctype from crimes group by month,ctype order by month asc,total desc”)

val crimerdd = result.rdd.map(x=>{
val month = x.getString(0)
val total = x.getLong(1)
val ctype = x.getString(2)
(month+"\t"+total+"\t"+ctype)
})

crimerdd.saveAsTextFile("/user/test//crime-text-comp",classOf[org.apache.hadoop.io.compress.GzipCodec])


#32

Do you find any error in the code? My output is different than what is shown in the video

sqlContext.sql("select cast(concat(substr(date,7,4), substr(date,0,2)) as int) crime_date,crime_type,count(1) count_crime_type " +
"from crim group by cast(concat(substr(date,7,4), substr(date,0,2)) as int), crime_type " +
“order by crime_date,count_crime_type desc”).show

I’m getting this output:

±---------±-------------------±---------------+
|crime_date| crime_type|count_crime_type|
±---------±-------------------±---------------+
| 200101| SIMPLE| 7061|
| 200101| $500 AND UNDER| 5125|
| 200101| OVER $500| 2401|
| 200101| TO VEHICLE| 2017|
| 200101| TO PROPERTY| 1952|
| 200101| AUTOMOBILE| 1518|
| 200101| POSS: CRACK| 1460|
| 200101| FORCIBLE ENTRY| 1452|
| 200101|POSS: CANNABIS 30…| 1190|
| 200101| TELEPHONE THREAT| 979|
| 200101|HARASSMENT BY TEL…| 919|
| 200101| POSS: HEROIN(WHITE)| 876|


#33

I quite appreciate this platform a lot. However for someone like me who is not expert yet in coding, is there available solutions to these practispce questions to I can compare to mine. In python please.

With great regards

Thank you


#34

Comparing python and Scala languages, can someone sincerely prescribe which is easier and simpler for the CCA 175 exams please?
Your sincerity will be highly appreciated. Thank you