Exercise 03 - Develop word count program using Scala IDE

Resources:

  • Click here for $35 coupon for CCA 175 Spark and Hadoop Developer using Python.
  • Click here for $35 coupon for CCA 175 Spark and Hadoop Developer using Scala.
  • Click here for $25 coupon for HDPCD:Spark using Python.
  • Click here for $25 coupon for HDPCD:Spark using Scala.
  • Click here for access to state of the art 13 node Hadoop and Spark Cluster

Description

  • Here we will try to understand development life cycle of spark application
  • Dependencies: Core Spark, typesafe.config

Problem Statement

  • Develop word count program using Scala IDE
  • Read the data from files and write the word count output to files
  • Make sure input and output paths are passed as arguments
  • Also use typesafe config to externalize execution mode
  • Use the scala interpreter and preview the data after each step using Spark APIs
  • Test it on your PC
  • Build jar file

application.properties

dev.deploymentMode = local
prod.deploymentMode = yarn-client

wordCount.scala
/**

  • Created by Ravinder on 3/20/2017.
    */

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.hadoop.fs._
import com.typesafe.config._
import scala.util.matching._

object wordCount {

def main(args : Array[String]): Unit = {


  val props = ConfigFactory.load()
  val conf = new SparkConf().
    setAppName("Top N products" ).
    setMaster(props.getConfig(args(0)).getString("deploymentMode"))
    val sc = new SparkContext(conf)


    val fileInput = args(1)
    val fileOutput = args(2)

    val fs = FileSystem.get(sc.hadoopConfiguration)
    val inputPathExists = fs.exists(new Path(fileInput))
    val outputPathExists = fs.exists(new Path(fileOutput))

    if (! inputPathExists) {
      println("Inpout file not exists")
      return
    }

    if ( outputPathExists) {
      println("deleted output dir ")
      fs.delete(new Path(fileOutput),true)
    }

    val fileReadRdd = sc.textFile(fileInput)
    val stripCurly = "[{~,!,@,#,$,%,^,&,*,(,),_,=,-,`,:,',?,/,<,>,.}]"
    val fileReadRdd2 = fileReadRdd.map(x => stripCurly.replaceAll(x,""))

    val fileFlat = fileReadRdd.flatMap(rec => rec.split(" "))
    val fileOne = fileFlat.map(rec => (rec,1))
    val fileReduce = fileOne.reduceByKey(_+_)
  // fileReduce.take(105).foreach(println)
  fileReduce.saveAsTextFile(fileOutput)

}

}

sbt
name := “WordC”

version := “1.0”

scalaVersion := “2.10.6”

libraryDependencies += “mysql” % “mysql-connector-java” % “5.1.36”

libraryDependencies += “com.typesafe” % “config” % “1.3.1”

libraryDependencies += “org.apache.spark” % “spark-core_2.10” % “1.6.0”

libraryDependencies += “org.apache.spark” % “spark-sql_2.10” % “1.6.0”

After running in client no issue, however in LAB getting error spark.yarn.driver.memoryOverhead is set but does not apply in client mode.

running with following works
spark-submit --class “wordCount” --master yarn --total-executor-cores 1 wordc_2.10-1.0.jar prod /public/randomtextwriter/ wc_big2

Can I mark this as exercise as complete?