Exercise 01 - Get monthly crime count by type

pyspark
spark-shell
apache-spark
scala
python

#1

Before attempting these questions, make sure you prepare by going through appropriate material.


Here are the Udemy coupons for our certification courses. Our coupons include 1 month lab access as well.

  • Click here for $35 coupon for CCA 175 Spark and Hadoop Developer using Python.
  • Click here for $35 coupon for CCA 175 Spark and Hadoop Developer using Scala.
  • Click here for $25 coupon for HDPCD:Spark using Python.
  • Click here for $25 coupon for HDPCD:Spark using Scala.
  • Click here for access to state of the art 13 node Hadoop and Spark Cluster

  • Details - Duration 40 minutes
    • Data set URL
    • Choose language of your choice Python or Scala
    • Data is available in HDFS file system under /public/crime/csv
    • You can check properties of files using hadoop fs -ls -h /public/crime/csv
    • Structure of data (ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location)
    • File format - text file
    • Delimiter - “,”
    • Get monthly count of primary crime type, sorted by month in ascending and number of crimes per type in descending order
    • Store the result in HDFS path /user/<YOUR_USER_ID>/solutions/solution01/crimes_by_type_by_month
    • Output File Format: TEXT
    • Output Columns: Month in YYYYMM format, crime count, crime type
    • Output Delimiter: \t (tab delimited)
    • Output Compression: gzip
  • Validation
  • Solutions

#2

In Python using Core API

CrimeData = sc.textFile(“Location”)
Header = CrimeData.first()
CrimeDataWithoutHeader = CrimeData.filter(lambda line: line != Header)

def bringDate(crime):
extractTimestamp = crime.split(",")[2]
extractDate = extractTimestamp.split(" “)[0]
extractYear = extractDate.split(”/")[2]
extractMonth = extractDate.split("/")[1]
requiredDate = int(extractYear + extractMonth)
return requiredDate

CriminalRecordsWithMonthAndType = CrimeDataWithoutHeader.map(lambda crime:((bringDate(crime), crime.split(",")[5]),1))
for i in CriminalRecordsWithMonthAndType.take(20): print(i)

crimeCountPerMonthPerType = CriminalRecordsWithMonthAndType.reduceByKey(lambda total, revenue : total + revenue)
for i in crimeCountPerMonthPerType.take(20): print(i)

dailyRevenuePerProductNameMap = crimeCountPerMonthPerType.map(lambda rec: ((rec[0][0], -rec[1]), rec[0][1]))
for i in dailyRevenuePerProductNameMap.take(20): print(i)

crimeCountPerMonthPerTypeSorted = dailyRevenuePerProductNameMap.sortByKey()
for i in crimeCountPerMonthPerTypeSorted.take(20): print(i)

crimeCountPerMonthPerTypeSortedResults = crimeCountPerMonthPerTypeSorted.map(lambda crime: str(crime[0][0]) + “\t” + str(-crime[0][1]) + “\t” + crime[1])
for i in crimeCountPerMonthPerTypeSortedResults.take(20): print(i)

crimeCountPerMonthPerTypeSortedResults.repartition(2).saveAsTextFile(path = “Location” , compressionCodecClass=“org.apache.hadoop.io.compress.GzipCodec”)

Spark-Shell Code
pyspark --master yarn --conf spark.ui.port=12643

CrimeData = sc.textFile(“Location”)
Header = CrimeData.first()
CrimeDataWithoutHeader = CrimeData.filter(lambda line: line != Header)

from pyspark.sql import Row

CrimeDataWithDateAndTypeDF = CrimeDataWithoutHeader.map(lambda crime: Row(crime_date = (crime.split(",")[2]), crime_type = (crime.split(",")[5]))).toDF()

CrimeDataWithDateAndTypeDF.registerTempTable(“crime_data”)

crimeCountPerMonthPerTypeDF = sqlContext.sql(“select cast(concat(substr(crime_date, 7, 4), substr(crime_date, 0, 2)) as int) crime_month, count(1) crime_count_per_month_per_type, crime_type from crime_data group by cast(concat(substr(crime_date, 7, 4), substr(crime_date, 0, 2)) as int), crime_type order by crime_month, crime_count_per_month_per_type desc”)

crimeCountPerMonthPerTypeDF.show(100)

crimeCountPerMonthPerTypeDFToRDD = crimeCountPerMonthPerTypeDF.rdd.map(lambda row: str(row[0]) +("\t") + str(row[1]) +("\t") + row[2]).coalesce(1).saveAsTextFile(path = “Location” , compressionCodecClass=“org.apache.hadoop.io.compress.GzipCodec”)


#3

Hey @raamukaakaa!
I’ve tried your code as well and I think I have found a little mistake.
In your reducebyKey step you are not reducing the years and month as needed. You are reducing the years and days!

See here:
In:
CriminalRecordsWithMonthAndType.take(5)

Out:
[((200724, u'CRIMINAL DAMAGE'), 1), ((200724, u'BATTERY'), 1), ((200724, u'BATTERY'), 1), ((200721, u'THEFT'), 1), ((200726, u'CRIMINAL DAMAGE'), 1)]

Or not? I hope I’m not wrong :relaxed:

Best regards,
Yannik


#4

I bought CCA 175 course from Udemy. I do not see video for getting crime dataset. It it not available in your github URL as well. Please do not enforce to buy itversity lab subscription. Please let me know where can I get dataset (URL) for this exercise to practice.


#5

Here it is: https://github.com/dgadiraju/data/tree/master/retail_db

Best regards,
Yannik


#6

Here is my version of python code. Let me know if it can be improvised.

def fetchDateAndType(record):
	recItems = record.split(",")
	inputdate = recItems[2]
	datetime = inputdate.split(" ")
	date = datetime[0].split("/")
	crimeDate = int(date[2]+date[0])
	crimeType = str(recItems[5])
	return crimeDate,crimeType

crimeData = sc.textFile("/public/crime/csv")
header = crimeData.first()
crimeDataWOHeader = crimeData.filter(lambda x : x != header)

crimeDF = crimeDataWOHeader.map(lambda rec: fetchDateAndType(rec)).toDF(["Crime_date","Crime_type"])

crimeDF.registerTempTable("crime_data")

crimeResults = sqlContext.sql("SELECT Crime_date,count(1) as Count, Crime_type from crime_data group by Crime_date,Crime_type order by Crime_date,Count desc ")

crimeResults.map(lambda rec : "\t".join([str(x) for x in rec])).coalesce(1).saveAsTextFile("/user/udinakar/solutions/sol01py/crimes_by_type_by_month",compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec")

#7

My Code In Scala DF and Spark SQL:

val data = sc.textFile("/public/crime/csv")
val header = data.first()
val datafile = data.filter(rec => rec != header)
val datac1 = datafile.map(rec => {var d = rec.split(",");( d(2), d(5))})

case class datacc (
datecol: String,
ctype: String);

val datac1DF = datac1.map(r => datacc(r._1, r._2)).toDF
datac1DF.registerTempTable(“datatable”)

sqlContext.sql(“select concat(substr(datecol, 7, 4), substr(datecol, 0, 2)) month, ctype, count(1) no_of_crimes from datatable group by concat(substr(datecol, 7, 4), substr(datecol, 0, 2)), ctype order by month, no_of_crimes desc”).rdd.map(r => r(0)+"\t"+ r(1)+"\t" + r(2)).repartition(1).saveAsTextFile("/user/cloudera/output", classOf[org.apache.hadoop.io.compress.GzipCodec])


#8

i like your code.
it seems that no one recognised that the aim was to have headers with white space inbetween. i cant get it to work ether. when i try to escape my sql in sqlcontext.sql it seems not possible to form names like “crime type” instead of “crime_type”.


#9

there are two videos in which Sir explain i will try to find and share here :slight_smile: . Sir do not enforce lab :slight_smile:


#10

import java.text.SimpleDateFormat

val inputDate = new SimpleDateFormat(“MM/dd/yyyy”)
val outputDate = new SimpleDateFormat(“yyyyMM”)

val crimeRDD = sc.textFile("/public/crime/csv").
mapPartitionsWithIndex( (ind, lines) => {
if(ind == 0) lines.drop(1) else lines }).
map( cData => {
val data = cData.split(",")
val date = data(2).trim.substring(0,10)
(outputDate.format(inputDate.parse(date)), data(5).trim)
})

val crimeDF = crimeRDD.toDF(“crime_date”, “crime_type”)

val monthlyCrimeCountByTypeDF = crimeDF.groupBy($“crime_date”,$“crime_type”).
agg(count($“crime_type”).alias(“crime_count”)).
orderBy($“crime_date”.asc, $“crime_count”.desc)

val monthlyCrimeCountByTypeRDD = monthlyCrimeCountByTypeDF.rdd.
map(rec => {
val data = rec.split(",")
(data(0))
})

val monthlyCrimeCountByTypeRDD = monthlyCrimeCountByTypeDF.rdd.
map(rec => rec.mkString("\t")).coalesce(1)

monthlyCrimeCountByTypeRDD.saveAsTextFile(“output_dir”, classOf[org.apache.hadoop.io.compress.GzipCodec])


#11

He’s not enforcing you, just showing you the path where you can use hadoop with cluster. And one more thing, It’s the only way he earn. You should be thankful that you’re learning this technology without any charge. Happy Learning :slight_smile:


#12
val recs = sc.textFile("/public/crime/csv").filter(rec=> !rec.startsWith("ID"))
case class crimes(month:String,crime :String)
import sqlContext.implicits._

val crimesDF = recs.map(rec=>{
	val ob = rec.split(",")
	val date = ob(2).split(" ")(0)
	val dateMonth = date.split("/")
    crimes(dateMonth(2).toString+dateMonth(0).toString,ob(5))
}).toDF()

crimesDF.registerTempTable("crimeRecords")

val result = sqlContext.sql("select month,crime,count(crime) cnt from crimeRecords group by month,crime  order by month, cnt desc")

val output = result.map(rec=>{
	rec(0)+"\t"+rec(2)+"\t"+rec(1)
})

output.saveAsTextFile("target/solutions/solution01/crimes_by_type_by_month",classOf[org.apache.hadoop.io.compress.GzipCodec])


#13

#using PySpark
from datetime import datetime
crimefile=sc.textFile("/public/crime/csv/*")
crimefirst=crimefile.first();
crimerdd=crimefile.filter(lambda rec: rec!=crimefirst)
crimemap=crimerdd.map(lambda rec: ((datetime.strptime(rec.split(",")[2],’%m/%d/%Y %H:%M:%S %p’).strftime(’%Y%m’),rec.split(",")[5]),rec.split(",")[0]))
crimecountbytype=crimemap.aggregateByKey(0,
lambda inter,id: inter+1,
lambda final,inter: final+inter)
crimecountmap=crimecountbytype.map(lambda rec: ((rec[0][0],-rec[1]),rec[0][1]))
crimecountsort=crimecountmap.sortByKey()
crimecount=crimecountsort.map(lambda rec:rec[0][0]+"/t"+str(-rec[0][1])+"/t"+rec[1])
crimecount.saveAsTextFile(path=“Location”,compressionCodecClass=“org.apache.hadoop.io.compress.GzipCodec”)


#16

My solution for this problem :-

crimeData = sc.textFile("/public/crime/csv")
header = crimeData.first()
crimeDataWOHeader = crimeData.filter(lambda x : x != header)
crimeDF = crimeDataWOHeader.map(lambda x:(str(x.split(",")[2].split(" “)[0].split(”/")[2] + x.split(",")[2].split(" “)[0].split(”/")[0]),x.split(",")[5])).toDF([“Crime_date”,“Crime_type”])
from pyspark.sql.functions import desc
crimeResults=crimeDF.groupBy(“Crime_date”,“Crime_type”).count().orderBy(“Crime_date”,desc(“count”))
crimeResults.map(lambda rec : “\t”.join([str(x) for x in rec])).coalesce(1).saveAsTextFile("/user/arkahome94/solutions/sol01py/crimes_by_type_by_month",compressionCodecClass=“org.apache.hadoop.io.compress.GzipCodec”)


#17

I tried saving the data with space in the columns it show the error as

crimeMonthCountNames.write.save("/play_sq/crime_by_month_d")
Traceback (most recent call last):
File “”, line 1, in
File “/usr/lib/spark/python/pyspark/sql/readwriter.py”, line 397, in save
self._jwrite.save(path)
File “/usr/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py”, line 813, in call
File “/usr/lib/spark/python/pyspark/sql/utils.py”, line 51, in deco
raise AnalysisException(s.split(’: ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u’Attribute name “MOnth in YYYYMM format” contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.


#19

We can rename the headers like:

results_RDD = crimeMonthCountNames. \
toDF(schema=['Month', 'crime count', 'crime type']). \
rdd.map(tuple). \
map(lambda c: 
	str(c[0]) + "\t" + c[1] + "\t" + str(c[2])
)

But what I’m missing here is, do the header names really matter when we save the data in text file format? The results are just the data and the headers are not saved anywhere (not even any meta-data for text files) until we explicitly add a header row to the RDD before saving it, which I don’t think is expected of us in the exam. Because, the header row might not be part of the solution and we might end up getting the whole question wrong! :confused:

Someone who has got a text file format question right, can add some inputs… Thanks!


#20

Yes, I tried this you can’t save data with space in headers to any of the formats like parquet, avro, orc, textfile(you can do by force adding).
But you can with JSON as df.toJSON().saveAsTextFile()

However, when checked with some chaps here they said not to worry much about headers.


#21

I tried to do .substring(0,10)
looks like some records don’t have enough length for date (may be junk date value) and it was failing with ArrayIndexOutofBoundException.

Is it true that some records have bad date ?
(I ended up checking for length>10 and then did substring to avoid error)

Please clarify.
Thanks.


#22

Hi @panamare,

actually i don’t understand the part when he or she answered to you saying " I just followed video content and saved output files as mentioned in the videos ". Which videos?


#23

Ok, so most of the folks writing on this blog are following Durga sir’s tutorial videos for taking the CCA175 certification exam. You can check on youtube for the channel ‘itversity’.

You can follow the below link for solving the certification exam using pyspark:

You can follow the below link for solving the exam using scala: