Exercise 05 - Develop word count program

pyspark
spark-shell
apache-spark
scala
python

#1
  • Details - Duration 20 minutes
    • Data is available in HDFS /public/randomtextwriter
    • Get word count for the input data using space as delimiter (for each word, we need to get how many types it is repeated in the entire input data set)
    • Number of executors should be 10
    • Executor memory should be 3 GB
    • Executor cores should be 20 in total (2 per executor)
    • Number of output files should be 8
    • Avro dependency details: groupId -> com.databricks, artifactId -> spark-avro_2.10, version -> 2.0.1
    • Target Directory: /user/<YOUR_USER_ID>/solutions/solution05/wordcount
    • Target File Format: Avro
    • Target fields: word, count
    • Compression: N/A or default
  • Validation
  • Solution

#2

where can i find the raw data?


#3

Mentioned in first line…
/public/randomtextwriter


#4

it is not available on the hortonworks sandbox maybe it is on itversity machine. that is the reason why i started this question


#5

Ah you are working on the sandbox. Didn’t know that.
I can’t find the data in Sir Durga’s git as well. :confused:


#6

does anyone has the solution?


#7

spark-shell
–master yarn
–num-executors 10
–executor-memory 3GB
–executor-cores 2
–packages com.databricks:spark-avro_2.10:2.0.1

import com.databricks.spark.avro

val lines = sc.textFile("/public/randomtextwriter")
val wordsKV = lines.flatMap(.split(" ")).
map((
,1))
val wordsCountDF = wordsKV.reduceByKey(+, 8).toDF(“word”, “count”)

wordCountDF.write.avro("/target_dir/solutions/solution05/wordcount")

[Its taking forever to execute though please feel free to through some comment on making it run little faster]


#8

val text = sc.textFile("/public/randomtextwriter")
val counts = text.flatMap(rec=>(rec.split(" "))).map(rec=>(rec,1)).reduceByKey((k,v)=>k+v,8)
import com.databricks.spark.avro._

val countsDF = counts.toDF("word","count")
countsDF.write.avro("/target/solution/solution05/wordcount")

It took a good 5 minutes for me to get the result


#9

Can anyone share the solution in PYSPARK?


#10

Here it is:
Launch pyspark as,

pyspark --master yarn \
--num-executors 10 \
--executor-memory 3G \
--total-executor-cores 20 \
--conf spark.ui.port=12999 \
--packages 'com.databricks:spark-avro_2.10:2.0.1'

In Pyspark shell,

wordFile = sc.textFile('/public/randomtextwriter')
words = wordFile.flatMap(lambda l: l.split(" "))
 
from operator import add

wordCount = words. \
map(lambda w: (w, 1)). \
reduceByKey(add)

for i in wordCount.take(10): print(i)

wordCount.toDF(schema=['word', 'count']).coalesce(8) \
write.format('com.databricks.spark.avro'). \
save('/user/snehaananthan/solutions/solution05/wordcount')

#11

Hi @sneha_ananthan
thank you very much.
I have two question:

  1. Why have you specified the --conf spar.ui.port=12999 when call pyspark?

2.Trasforming the rdd wordCount to Data Frame you used .toDF(schema=[‘var1’, ‘var2’]). Does it avoid the use of :
from pyspark.sql import Row ecc. ecc.
Is that correct?


#12

@graschella,

Answers to the questions:

  1. Sorry for the typo, i have edited my post now.

It is the port where our spark appication UI will be available to monitor the following:

spark.ui.port - Port for your application's dashboard, which shows memory and workload data.

We give the above conf property only in our labs, it’s not necessary in exam.
While connecting to the lab, this property helps launching spark quick without relying on dynamic port allocation for the UI. This is my understanding. Anyone who has more knowledge on this can add inputs.

  1. Yes, you are right, when there is an RDD that needs to be converted to DF without the need of typecasting the column values, we can use toDF(schema=[…]) for simplicity.

#13

I’m getting special characters after applying reducebykey. Does anyone know why?


#14

Hi, answered in http://discuss.itversity.com/t/word-count-program/10759/2?u=sneha_ananthan


#15

Not only did it take a long time to process, it also threw a lot of execptions. In the end, I got the 8 output files, but not sure if the output is proper

Lost task 14.0 in stage 0.0 (TID 7, wn07.itversity.com): java.net.SocketException: Connection reset

Lost task 108.0 in stage 0.0 (TID 200, wn06.itversity.com): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)

Lost task 181.0 in stage 0.0 (TID 266, wn06.itversity.com): java.net.SocketException: Connection reset

Lost task 228.0 in stage 0.0 (TID 285, wn06.itversity.com): java.net.SocketException: Connection reset

Below is my program:

pyspark --master yarn --conf spark.ui.port=12888
–num-executors 8
–packages com.databricks:spark-avro_2.10:2.0.1
–total-executor-cores 20
–executor-memory 3G

lines = sc.textFile("/public/randomtextwriter")
words = lines.flatMap(lambda x : x.split(" "))
wordMap = words.map(lambda x: (x,1))
wordCount = wordMap.reduceByKey(lambda x,y:x+y)

df=wordCount.toDF(schema=[“word”,“count”])
df.coalesce(8).write.format(‘com.databricks.spark.avro’).save(output path)


#16

It looks good. There might be some intermittent issue.


#17

Hi @sneha_ananthan

since it’s not specified in the question, is it necessary to write “–master yarn” in the code?

Please @itversity help me

Thank you


#18

Infact i got the same error ,but by debugging i found out,it is not text file

after executing the below 2 commands

lines = sc.textFile("/public/randomtextwriter")
for i in lines.take(10): print i

SEQorg.apache.hadoop.io.Textorg.apache.hadoop.io.Text�1��J$�גd�XWpterostigma ste prelationship pleasurehood abusiveness seelful unstipulated winterproof �gmerica rp pentosuria airfreighter orthopedical symbiogenetic…

then i observed that first line “SEQorg.apache.hadoop.io.Textorg.apache.hadoop.io.Text�”
which clearly indicates it a sequence file,So we need to use sc.sequenceFile(""/public/randomtextwriter"), then before applying flatmap ,since it is key value pair,we need to filter only the value part , then proceed with further transformations,…

tupels = sc.sequenceFile(""/public/randomtextwriter").map(lambda p:p[1]).flatMap(lambda p:p.split(" ")).map(lambda p:(p,1))

from operators import add

resultdf =tupels.reduceByKey(add).toDF([“word”,“count”])


#19

i think it is sequene file,but not text file

lines = sc.textFile("/public/randomtextwriter")
for i in lines.take(10): print i

SEQorg.apache.hadoop.io.Textorg.apache.hadoop.io.Text�1��J$�גd�XWpterostigma ste prelationship pleasurehood abusiveness seelful unstipulated winterproof �gmerica rp pentosuria airfreighter orthopedical symbiogenetic…

then i observed that first line “SEQorg.apache.hadoop.io.Textorg.apache.hadoop.io.Text�”
which clearly indicates it a sequence file,So we need to use sc.sequenceFile(""/public/randomtextwriter")…

Please correct me if i was wrong


#20

Yes, you are right. We should load it as a sequence file instead of text file. I did not realize the input content was sequence file. Thanks!