Exercise 02 - Develop Scala based application using JDBC and typesafe config

Resources:

  • Click here for $35 coupon for CCA 175 Spark and Hadoop Developer using Python.
  • Click here for $35 coupon for CCA 175 Spark and Hadoop Developer using Scala.
  • Click here for $25 coupon for HDPCD:Spark using Python.
  • Click here for $25 coupon for HDPCD:Spark using Scala.
  • Click here for access to state of the art 13 node Hadoop and Spark Cluster

Description

  • Intention of this problem is to understand the process of developing applications
  • Dependencies: MySQL JDBC, typesafe.config for externalizing JDBC URL
  • Use MySQL as database. If you do not have one, install it locally.
  • Steps to setup database
  • Create database by name HR
  • Switch to HR database
  • Run this script, it will create bunch of tables
  • List the tables
  • Make sure employees table is in the list
  • Understand all the columns
  • Also there is database by name hr in lab, user name hr_ro, password itversity. You can connect to it as well, instead of setting up database locally
  • Use best programming practices, than trying to do every thing using scala (for eg, if you write sql try to get as much work as possible done in sql it self)

Problem Statement

  • Let us get commission earned by each and every employee
  • For each and every employee who have commission_pct set compute commission earned in dollars (by multiplying salary with commission_pct)
  • Print employee name, salary and commission earned
  • If commission_pct is null in database, print “N/E” for commission earned
  1. Let us get commission earned by each and every employee
    SQL> select employee_id, first_name, last_name, (salary * commission_pct) as Commission from employees;

  2. For each and every employee who have commission_pct set compute commission earned in dollars (by multiplying salary with commission_pct)
    SQL>select
    employee_id,
    first_name,
    last_name,
    salary,
    (commission_pct * salary) as CommisionInDolars
    from employees
    where commission_pct is not null

  3. Print employee name, salary and commission earned
    SQL> select
    first_name,
    last_name,
    salary,
    (salary*ifnull(commission_pct,0)) as Commision
    from employees

  4. If commission_pct is null in database, print “N/E” for commission earned
    SQL>

select a.employee_id, a.first_name, a.salary,
(case when (a.comPct = 0.00 )
THEN 'N/A’
ELSE (a.salary *a.commission_pct)
END) CommisionEarned
from
(select employee_id, first_name, last_name,
ifnull(commission_pct,0) comPct, salary, commission_pct
from employees) a;

–Simplified version from above

select employee_id, first_name, salary,
(case when (ifnull(commission_pct,0) = 0.00 )
THEN 'N/A’
ELSE (salary *commission_pct)
END) CommisionEarned
from employees

package com.itversity.ScalaRDBMS

import com.typesafe.config._
import scala.Array
import java.sql._
import com.itversity.ScalaRDBMS.EmpComissionClass

object EmpComission{

def main(args: Array[String]):Unit={

println("EmpComission.main()	:	I am trying to establish connection with MySQL ")

var con:Connection =  null
var stmt:Statement =  null
var rs:ResultSet   =  null
    

try  
{
  Class.forName("com.mysql.jdbc.Driver")
  // Load parameters from file: appconfig.conf
  val config = ConfigFactory.load("appconfig.conf").getConfig("LocalConf.DBConnectionData") 
  var HOST:String = config.getString("HOST")
  val DATABASE:String = config.getString("DATABASE")
  val USER_NAME:String = config.getString("USER_NAME")
  val PASSWORD:String = config.getString("PASSWORD")
  val connctString = HOST+DATABASE+","+USER_NAME+","+PASSWORD
  //println("The connect String parameters are: "+connctString) 
        
  con  = DriverManager.getConnection(HOST+DATABASE,USER_NAME,PASSWORD)
  //val con:java.sql.Connection  = DriverManager.getConnection("jdbc:mysql://nn01.itversity.com:3306/hr","hr_ro","itversity")
   
  stmt = con.createStatement()
  
  rs = stmt.executeQuery("select employee_id, first_name, last_name, ifnull((salary * commission_pct),0.00)"+ 
                          "as Commission from employees;")
  
  var cnt = 0
  
  // while (rs.next()) println(rs.getInt("employee_id"))
    
    Iterator.continually( (rs,rs.next()) ).takeWhile(_._2).map(_._1).map(rs => {
      EmpComissionClass( rs.getInt("employee_id"),
                rs.getString("first_name"),
                rs.getString("last_name"),
                rs.getDouble("Commission")
            )
       }).foreach ( rec => {
                         cnt= cnt+1 
                         println("Record : "+cnt+" => "+rec)                    
       } )
       
       println("The total Count is : "+cnt)

}
catch
{
  case e:Exception => println("ScalaRDBMSDemo.dbConnect()	:	exception caught: " + e.getMessage)
        
}
finally
{
  println("ScalaRDBMSDemo.dbConnect()	:	Finally after all exceptions are handles ..")
}

}

}

=====================================================

Case Class:

package com.itversity.ScalaRDBMS

case class EmpComissionClass(employee_id:Int,
first_name:String,
last_name:String,
commission:Double) {

override def toString():String={

val empString:String = "FirstName : "+first_name+", LastName : "+last_name+" his Commission = "+commission

empString

 }

}

==================================================

config param:
//appconfig.conf

LocalConf
{
DBConnectionData
{
HOST = "jdbc:mysql://nn01.itversity.com:3306/"
DATABASE ="hr"
USER_NAME ="hr_ro"
PASSWORD = “itversity”
}
}

==================================

POM.XML:

com.typesafe config 1.2.1

In the above code, if i want to collect all EmpComissionClass objects and pass to next layer, how to add these objects to a list in side the loop?

once Iterator is completely processed, i want to have a List of objects of type “EmpComissionClass”

Iterator.continually( (rs,rs.next()) ).takeWhile(_.2).map(._1).map(rs => {
EmpComissionClass( rs.getInt(“employee_id”),
rs.getString(“first_name”),
rs.getString(“last_name”),
rs.getDouble(“Commission”)
)
}).foreach ( rec => {
cnt= cnt+1
println("Record : “+cnt+” => "+rec)
} )

If you have access to LAB User id is hr_ro; I just wasted 10 min to find this … and may be usefull some one

1 Like

How can create a database in the lab?

Hi Sir,

Please let me know whether this approach can be implemented !!

I have created database in mysql, copied the dataset from mysql to HDFS using sqoop and then used SqlContext in scala.

Code :

[Cloudera]$ : sqoop import --connect “jdbc:mysql://quickstart.cloudera:3306/HR” --username root -password cloudera --table emp_details_view --target-dir /user/hive/aswin/emp_details_view -m 1

scala >

import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._

case class EmpDetails(
name : String,
salary : Double,
comm : Double
)

val a = sc.textFile("/user/hive/aswin/emp_details_view/part*")
val b = a.filter(rec => rec.split(",")(9) != “null”)
val EmpDF = b.map(rec => {
val r = rec.split(",")
EmpDetails(r(6) , r(8).toDouble, r(9).toDouble )
}).toDF

EmpDF.registerTempTable(“EmployeeDetails”)
sqlContext.sql(“Select name, salary, (salary*comm) from EmployeeDetails”).collect().foreach(println)

Output:

Thanks,
Aswin