Exercise 21 - Spark JDBC - Generate order revenue by getting data from mysql tables

spark-sql
apache-spark
jdbc
#1

Problem Statement:

  • Generate order revenue for each day (also sort by order_date)
  • Query data from mysql (orders and order_items)
  • mysql package libraryDependencies += "mysql" % "mysql-connector-java" % "5.1.12"
  • JDBC URL hint: url = "jdbc:mysql://hn:pn/dn?user=un&password=pwd
  • Launch scala interpreter like this for validation spark-shell --driver-class-path /usr/share/java/mysql-connector-java.jar

Please provide the following:

  • Query and subsequent processing
  • Sample output - 10 records
1 Like

#2

val orders_mysl_df = sqlContext.read.format(“jdbc”).option(“url”, “jdbc:mysql://nn01.itversity.com/retail_db”).option(“driver”, “com.mysql.jdbc.Driver”).option(“dbtable”, “orders”).option(“user”, “retail_dba”).option(“password”, “itversity”).load()
val orderitems_mysl_df = sqlContext.read.format(“jdbc”).option(“url”, “jdbc:mysql://nn01.itversity.com/retail_db”).option(“driver”, “com.mysql.jdbc.Driver”).option(“dbtable”, “order_items”).option(“user”, “retail_dba”).option(“password”, “itversity”).load()
val ordersFiltered = orders_mysl_df.filter(orders_mysl_df(“order_status”) === “COMPLETE”)
val orders_orderitem_Join = ordersFiltered.join(orderitems_mysl_df,ordersFiltered(“order_id”) === orderitems_mysl_df(“order_item_order_id”))
val rev_orders=orders_orderitem_Join.groupBy(“order_date”).agg(sum(“order_item_subtotal”)).sort(“order_date”)

output:
±-------------------±-----------------------+
| order_date|sum(order_item_subtotal)|
±-------------------±-----------------------+
|2013-07-25 00:00:…| 20030.32|
|2013-07-26 00:00:…| 42165.88|
|2013-07-27 00:00:…| 33156.21000000002|
|2013-07-28 00:00:…| 27012.91|
|2013-07-29 00:00:…| 45898.65000000002|
|2013-07-30 00:00:…| 40590.21|
|2013-07-31 00:00:…| 46503.829999999994|
|2013-08-01 00:00:…| 38231.409999999996|
|2013-08-02 00:00:…| 36633.44|
|2013-08-03 00:00:…| 34828.71|
|2013-08-04 00:00:…| 26161.97|
|2013-08-05 00:00:…| 25804.330000000005|
|2013-08-06 00:00:…| 41413.79|
|2013-08-07 00:00:…| 31533.100000000002|
|2013-08-08 00:00:…| 27359.070000000003|
|2013-08-09 00:00:…| 22091.95|
|2013-08-10 00:00:…| 39038.720000000016|
|2013-08-11 00:00:…| 21302.690000000002|
|2013-08-12 00:00:…| 41139.11000000001|
|2013-08-13 00:00:…| 12468.53|
±-------------------±-----------------------+
only showing top 20 rows

0 Likes

#3

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

object MysqlJoin {
def main(args: Array[String]) {
// Saprk Context
val conf = new SparkConf().setAppName(“RevPerDayDF”)
val sc = new SparkContext(conf)
//Sql Context
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

 sqlContext.setConf("spark.sql.shuffle.partitions", "2")
 import sqlContext.implicits._

val OrderDF = sqlContext.read.format(“jdbc”).options(
Map(“url” -> “jdbc:mysql://nn01.itversity.com:3306/retail_db?user=retail_dba&password=itversity”,
“dbtable” -> “retail_db.orders”)).load()

val OrderITDF = sqlContext.read.format("jdbc").options(

Map(“url” -> “jdbc:mysql://nn01.itversity.com:3306/retail_db?user=retail_dba&password=itversity”,
“dbtable” -> “retail_db.order_items”)).load()

    OrderDF.registerTempTable("orders")

    OrderITDF.registerTempTable("order_items")

  val JoinSelect = sqlContext.sql("Select order_date, sum(order_item_subtotal) from orders join order_items on order_id = order_item_order_id where order_status = 'COMPLETE' group by order_date").rdd.saveAsTextFile("/user/saswat232/retail_db/DFSQLJoin")

}
}

Output ::

[2013-07-25 00:00:00.0,20030.320388793945]
[2013-07-27 00:00:00.0,33156.210554122925]
[2013-07-29 00:00:00.0,45898.65076828003]
[2013-07-30 00:00:00.0,40590.21075248718]
[2013-08-02 00:00:00.0,36633.44071006775]
[2013-08-04 00:00:00.0,26161.970418930054]
[2013-08-06 00:00:00.0,41413.79079246521]
[2013-08-08 00:00:00.0,27359.07057762146]
[2013-08-11 00:00:00.0,21302.69044494629]
[2013-08-13 00:00:00.0,12468.530212402344]
[2013-08-15 00:00:00.0,37473.12078857422]
[2013-08-17 00:00:00.0,52940.94108963013]
[2013-08-19 00:00:00.0,10603.870204925537]
[2013-08-20 00:00:00.0,32680.050630569458]
[2013-08-22 00:00:00.0,30542.99054336548]
[2013-08-24 00:00:00.0,39566.76083564758]
[2013-08-26 00:00:00.0,32764.19061088562]
[2013-08-28 00:00:00.0,15508.830348968506]

0 Likes

#4

sparkCode:

import com.typesafe.config._
import org.apache.spark.SparkContext, org.apache.spark.SparkConf
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql._

object MysqlDF {
   def main(args: Array[String]) {
       val appConf=ConfigFactory.load()
    val conf = new SparkConf().setAppName("Revenue using DF").setMaster(appConf.getConfig(args(0)).getString("deploymentMode"))
    val sc = new SparkContext(conf)
    val fs = FileSystem.get(sc.hadoopConfiguration)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    //val username = appConf.getConfig("username")
    //val password = appConf.getConfig("password")
    sqlContext.setConf("","")
    val url="jdbc:mysql://"+
     appConf.getString("hostname-port")+"/"+appConf.getString("databaseName")+"?user="+appConf.getString("username")+"&password="+appConf.getString("passwd")
    
    val revenuePerDayDF = sqlContext.read.format("jdbc").option("url",url).
    option("username","retail_dba")
    .option("password","itversity").option("dbtable","(	select o.order_date, sum(oi.order_item_subtotal) revenue_per_day"+
       " from orders o join order_items oi"+
       " on o.order_id=oi.order_item_order_id"+
       " where o.order_status='COMPLETE'"+
       " group by o.order_date order by order_date) query").load()
       
     revenuePerDayDF.show()
   }
}

application.prop:
hostname-port:3306=nn01.itversity.com
databaseName=retail_db
username=retail_dba
passwd=itversity

o/p:

±-------------------±-----------------+
| order_date| revenue_per_day|
±-------------------±-----------------+
|2013-07-25 00:00:…|20030.320388793945|
|2013-07-26 00:00:…| 42165.8807926178|
|2013-07-27 00:00:…|33156.210554122925|
|2013-07-28 00:00:…|27012.910556793213|
|2013-07-29 00:00:…| 45898.65076828003|
|2013-07-30 00:00:…| 40590.21075248718|
|2013-07-31 00:00:…|46503.830892562866|
|2013-08-01 00:00:…| 38231.41069984436|
|2013-08-02 00:00:…| 36633.44071006775|
|2013-08-03 00:00:…| 34828.71069145203|
|2013-08-04 00:00:…|26161.970418930054|
|2013-08-05 00:00:…|25804.330533981323|
|2013-08-06 00:00:…| 41413.79079246521|
|2013-08-07 00:00:…|31533.100637435913|
|2013-08-08 00:00:…| 27359.07057762146|
|2013-08-09 00:00:…|22091.950464248657|
|2013-08-10 00:00:…| 39038.72074890137|
|2013-08-11 00:00:…| 21302.69044494629|
|2013-08-12 00:00:…|41139.110677719116|
|2013-08-13 00:00:…|12468.530212402344|
±-------------------±-----------------+

0 Likes

#5

val dataframe_mysql = sqlContext.read.format(“jdbc”).option(“url”, “jdbc:mysql://nn01.itversity.com/retail_db”).
option(“driver”, “com.mysql.jdbc.Driver”).option(“dbtable”, “orders”).option(“user”, “retail_dba”).option(“password”, “itversity”).load()
val orderitems_df = sqlContext.read.format(“jdbc”).option(“url”, “jdbc:mysql://nn01.itversity.com/retail_db”).
option(“driver”, “com.mysql.jdbc.Driver”).option(“dbtable”, “order_items”).option(“user”, “retail_dba”).
option(“password”, “itversity”).load()
val ordersFilter = dataframe_mysql.filter(dataframe_mysql(“order_status”)===“COMPLETE”)
val orders_orderitem_Join = ordersFilter.join(orderitems_df,ordersFilter(“order_id”) === orderitems_df(“order_item_order_id”))

val rev_orders=orders_orderitem_Join.groupBy(“order_date”).agg(sum(“order_item_subtotal”)).sort(“order_date”)
| order_date|sum(order_item_subtotal)|
±-------------------±-----------------------+
|2013-07-25 00:00:…| 20030.32|
|2013-07-26 00:00:…| 42165.88|
|2013-07-27 00:00:…| 33156.21000000002|
|2013-07-28 00:00:…| 27012.91|
|2013-07-29 00:00:…| 45898.65000000002|
|2013-07-30 00:00:…| 40590.21|
|2013-07-31 00:00:…| 46503.829999999994|
|2013-08-01 00:00:…| 38231.409999999996|
|2013-08-02 00:00:…| 36633.44|
|2013-08-03 00:00:…| 34828.71|
|2013-08-04 00:00:…| 26161.97|
|2013-08-05 00:00:…| 25804.330000000005|
|2013-08-06 00:00:…| 41413.79|
|2013-08-07 00:00:…| 31533.100000000002|
|2013-08-08 00:00:…| 27359.070000000003|
|2013-08-09 00:00:…| 22091.95|
|2013-08-10 00:00:…| 39038.720000000016|
|2013-08-11 00:00:…| 21302.690000000002|
|2013-08-12 00:00:…| 41139.11000000001|
|2013-08-13 00:00:…| 12468.53|
±-------------------±-----------------------

0 Likes

#6

#Query and subsequent processing

package sparkk

import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

import com.typesafe.config.ConfigFactory

import org.apache.spark.sql.SQLContext

object SparkJdbc {
  def main(args: Array[String]): Unit = {

    Logger.getLogger("org").setLevel(Level.OFF)
    Logger.getLogger("akka").setLevel(Level.OFF)

    val appConfig = ConfigFactory.load()

    val master = appConfig.getConfig(args(2)).getString("deployment")

    val conf = new SparkConf().setMaster(master).setAppName("Spark on Retail_db")
    val sc = new SparkContext(conf)

    val inputPath = args(0)
    val outputPath = args(1)

    val fs = FileSystem.get(sc.hadoopConfiguration)
    val inputPathExists = fs.exists(new Path(inputPath))
    val outputPathExists = fs.exists(new Path(outputPath))

    if (!inputPathExists) {
      println("Input Path does not exists")
      return
    }

    if (outputPathExists) {
      fs.delete(new Path(outputPath), true)
    }

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    sqlContext.setConf("spark.sql.shuffle.partitions", "2")

    val connectionUrl = ("url" -> "jdbc:mysql://nn01.itversity.com:3306/retail_db?user=retail_dba&password=itversity")

    val forOrder = Map(connectionUrl, ("dbtable" -> "orders"))
    val forOrderItems = Map(connectionUrl, ("dbtable" -> "order_items"))
    val ordersDF = sqlContext.read.format("jdbc").options(forOrder).load()
    val ordersItemsDF = sqlContext.read.format("jdbc").options(forOrderItems).load()

    ordersDF.registerTempTable("orders")
    ordersItemsDF.registerTempTable("order_items")

    val result = sqlContext.sql("Select order_date, sum(order_item_subtotal) from orders join order_items on order_id = order_item_order_id where order_status = 'COMPLETE' group by order_date")

    result.rdd.saveAsTextFile(outputPath + "/excersize21")

  }
}

###Running using spark-submit

spark-submit --class sparkk.SparkJdbc scala-spark-training_2.10-1.0.jar /public/retail_db /user/farhanmisarwala/output/spark-output prod

#Sample output - 10 records

[2013-08-03 00:00:00.0,34828.70999999998]
[2013-08-04 00:00:00.0,26161.969999999976]
[2013-08-05 00:00:00.0,25804.32999999998]
[2013-08-06 00:00:00.0,41413.789999999986]
[2013-08-07 00:00:00.0,31533.099999999984]
[2013-08-08 00:00:00.0,27359.069999999985]
[2013-08-09 00:00:00.0,22091.94999999999]
[2013-08-10 00:00:00.0,39038.71999999999]
[2013-08-20 00:00:00.0,32680.04999999998]
[2013-08-21 00:00:00.0,20326.66999999999]
0 Likes

#7

import java.sql.DriverManager
import java.sql.Connection

import com.typesafe.config.ConfigFactory
import com.typesafe.config._

import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

object SparkExercise21 {
def main(args: Array[String]): Unit = {
var connection: Connection = null
var config = ConfigFactory.load()

val url = s"jdbc:mysql://" + config.getString("databaseHost") + "/" + config.getString("databaseName")
val username = config.getString("userName")
val password = config.getString("password")
val urlFinal = url + "?user=" + username + "&password=" + password
//jdbc:mysql://hn:pn/dn?user=un&password=pwd

val conf = new SparkConf().setAppName("Sneh Exercise 19 on DataFrames").setMaster(config.getConfig(args(2)).getString("deployment"))
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
sqlContext.setConf("spark.sql.shuffle.partitions", "2")
import sqlContext.implicits._

val ordersDf = sqlContext.read.format("jdbc").options(Map("url" -> urlFinal, "dbtable" -> "orders")).load()
ordersDf.registerTempTable("orders")
val orderItemsDf = sqlContext.read.format("jdbc").options(Map("url" -> urlFinal, "dbtable" -> "order_items")).load()
orderItemsDf.registerTempTable("order_items")

sqlContext.sql("SELECT order_date,sum(order_item_subtotal) FROM orders join order_items ON order_id=order_item_order_id GROUP BY order_date ORDER BY order_date")

val resultDf1 = sqlContext.read.format("jdbc").options(Map("url" -> urlFinal, "dbtable" -> ("(SELECT order_date,sum(order_item_subtotal) Revenue FROM orders join order_items ON order_id=order_item_order_id GROUP BY order_date ORDER BY order_date) ord"))).load()

}
}


±-------------------±-----------------+
| order_date| Revenue|
±-------------------±-----------------+
|2013-07-25 00:00:…| 68153.83132743835|
|2013-07-26 00:00:…|136520.17266082764|
|2013-07-27 00:00:…|101074.34193611145|
|2013-07-28 00:00:…| 87123.08192253113|
|2013-07-29 00:00:…|137287.09244918823|
|2013-07-30 00:00:…|102745.62186431885|
|2013-07-31 00:00:…|131878.06256484985|
|2013-08-01 00:00:…|129001.62241744995|
|2013-08-02 00:00:…|109347.00200462341|
|2013-08-03 00:00:…| 95266.89186286926|
±-------------------±-----------------+

0 Likes

#8

def main(args: Array[String]) {

 val appconf = ConfigFactory.load()
val conf = new SparkConf().setAppName("jdbc DataFrame").setMaster(appconf.getConfig(args(1)).getString("deployement"))
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val outputPath = args(0)

val config= ConfigFactory.load();

// var connection: Connection = null
val driver = config.getString(“driver”)
val url = s"jdbc:mysql://"+config.getString(“databaseHost”)+"/"+config.getString(“databaseName”)+"?user="+config.getString(“username”)+"&password="+config.getString(“password”)

val query= "(select order_date , sum(order_item_subtotal) revenue_per_day "+

“from orders join order_items “+
” on order_id=order_item_order_id “+
” where order_status=‘COMPLETE’ “+
” group by order_date “+
” order by order_date limit 50 ) q”

 val revenue = sqlContext.read.format("jdbc").options( Map("url" -> url,  "dbtable" -> query)).load()
 revenue.rdd.saveAsTextFile(outputPath)

[2013-07-25 00:00:00.0,20030.320388793945]
[2013-07-26 00:00:00.0,42165.8807926178]
[2013-07-27 00:00:00.0,33156.210554122925]
[2013-07-28 00:00:00.0,27012.910556793213]
[2013-07-29 00:00:00.0,45898.65076828003]
[2013-07-30 00:00:00.0,40590.21075248718]
[2013-07-31 00:00:00.0,46503.830892562866]
[2013-08-01 00:00:00.0,38231.41069984436]
[2013-08-02 00:00:00.0,36633.44071006775]
[2013-08-03 00:00:00.0,34828.71069145203]
[2013-08-04 00:00:00.0,26161.970418930054]
[2013-08-05 00:00:00.0,25804.330533981323]
[2013-08-06 00:00:00.0,41413.79079246521]
[2013-08-07 00:00:00.0,31533.100637435913]
[2013-08-08 00:00:00.0,27359.07057762146]

0 Likes

#9

package spark.scala
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import com.typesafe.config.ConfigFactory
import org.apache.spark.sql._
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

object MysqlJDBC{
def main(args: Array[String]) {

val appConf = ConfigFactory.load()
val conf = new SparkConf().setAppName(“SparkMysqlJDBC”)
val sc = new SparkContext(conf)

val fs = FileSystem.get(sc.hadoopConfiguration)

val sqlContext = new SQLContext(sc)
import sqlContext.implicits._

sqlContext.setConf(“spark.sql.shuffle.partitions”, “2”)

val order_df_mysql = sqlContext.read.format(“jdbc”).options(
Map(“url” -> “jdbc:mysql://nn01.itversity.com:3306/retail_db?user=retail_dba&password=itversity”,
“dbtable” -> “retail_db.orders”)).load()

val order_items_df_mysql = sqlContext.read.format(“jdbc”).options(
Map(“url” -> “jdbc:mysql://nn01.itversity.com:3306/retail_db?user=retail_dba&password=itversity”,
“dbtable” -> “retail_db.order_items”)).load()

order_df_mysql.registerTempTable("orders")

order_items_df_mysql.registerTempTable("order_items")

val revenueTotal = sqlContext.sql("Select order_date, sum(order_item_subtotal) from orders join order_items on order_id = order_item_order_id where order_status = 'COMPLETE' group by order_date").rdd.saveAsTextFile("/user/mahesh007/sparkTraining/SparkMysqlJDBC")

}

Output:

[2013-07-25 00:00:00.0,20030.32]
[2013-07-26 00:00:00.0,42165.88]
[2013-07-27 00:00:00.0,33156.21000000002]
[2013-07-28 00:00:00.0,27012.91]
[2013-07-29 00:00:00.0,45898.65000000002]
[2013-07-30 00:00:00.0,40590.21]
[2013-07-31 00:00:00.0,46503.829999999994]
[2013-08-01 00:00:00.0,38231.409999999996]
[2013-08-02 00:00:00.0,36633.44]
[2013-08-03 00:00:00.0,34828.71]

0 Likes

#10

Query and subsequent processing

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import com.typesafe.config._
import org.apache.spark.sql
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SQLContext
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

object MySqlJDBCDataframes {

  def main(args: Array[String]) {
    val config = ConfigFactory.load()
    val conf = new SparkConf().setAppName("MySql JDBC Dataframes").setMaster(config.getConfig(args(0)).getString("deployment"))

    val sc = new SparkContext(conf)
    val ssc = new SQLContext(sc)
    ssc.setConf("spark.sql.shuffle.partitions", "2")

    import ssc.implicits._

    val fs = FileSystem.get(sc.hadoopConfiguration)
    val outputpath = new Path(args(1))
    val outputPathExists = fs.exists(outputpath)

    if (outputPathExists) {
      fs.delete(outputpath, true)
    }
    val url = "jdbc:mysql://nn01.itversity.com/retail_db?user=retail_dba&password=itversity"
    //val table = "orders"
    
    val OrdersJdbcDf = ssc.read.format("jdbc").options(Map("url" -> url, "dbtable" -> "orders")).load()
    
    val Order_ItemsJdbcDf = ssc.read.format("jdbc").options(Map("url" -> url, "dbtable" -> "order_items")).load()
    
    OrdersJdbcDf.registerTempTable("orders")
    
    Order_ItemsJdbcDf.registerTempTable("order_items")
    
    val OrderJoinOrderItems = ssc.sql("_Select order_date, sum(order_item_subtotal) from orders join order_items on order_id = order_item_order_id where order_status = 'COMPLETE' group by order_date_").rdd
    
    OrderJoinOrderItems.saveAsTextFile(outputpath.toString())
    
  }

}

Output:

[2013-08-03 00:00:00.0,34828.70999999998]
[2013-08-04 00:00:00.0,26161.969999999976]
[2013-08-05 00:00:00.0,25804.32999999998]
[2013-08-06 00:00:00.0,41413.789999999986]
[2013-08-07 00:00:00.0,31533.099999999984]
0 Likes