import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
object MysqlJoin {
def main(args: Array[String]) {
// Saprk Context
val conf = new SparkConf().setAppName(âRevPerDayDFâ)
val sc = new SparkContext(conf)
//Sql Context
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext.setConf("spark.sql.shuffle.partitions", "2")
import sqlContext.implicits._
val OrderDF = sqlContext.read.format(âjdbcâ).options(
Map(âurlâ -> âjdbc:mysql://nn01.itversity.com:3306/retail_db?user=retail_dba&password=itversityâ,
âdbtableâ -> âretail_db.ordersâ)).load()
val OrderITDF = sqlContext.read.format("jdbc").options(
Map(âurlâ -> âjdbc:mysql://nn01.itversity.com:3306/retail_db?user=retail_dba&password=itversityâ,
âdbtableâ -> âretail_db.order_itemsâ)).load()
OrderDF.registerTempTable("orders")
OrderITDF.registerTempTable("order_items")
val JoinSelect = sqlContext.sql("Select order_date, sum(order_item_subtotal) from orders join order_items on order_id = order_item_order_id where order_status = 'COMPLETE' group by order_date").rdd.saveAsTextFile("/user/saswat232/retail_db/DFSQLJoin")
}
}
Output ::
[2013-07-25 00:00:00.0,20030.320388793945]
[2013-07-27 00:00:00.0,33156.210554122925]
[2013-07-29 00:00:00.0,45898.65076828003]
[2013-07-30 00:00:00.0,40590.21075248718]
[2013-08-02 00:00:00.0,36633.44071006775]
[2013-08-04 00:00:00.0,26161.970418930054]
[2013-08-06 00:00:00.0,41413.79079246521]
[2013-08-08 00:00:00.0,27359.07057762146]
[2013-08-11 00:00:00.0,21302.69044494629]
[2013-08-13 00:00:00.0,12468.530212402344]
[2013-08-15 00:00:00.0,37473.12078857422]
[2013-08-17 00:00:00.0,52940.94108963013]
[2013-08-19 00:00:00.0,10603.870204925537]
[2013-08-20 00:00:00.0,32680.050630569458]
[2013-08-22 00:00:00.0,30542.99054336548]
[2013-08-24 00:00:00.0,39566.76083564758]
[2013-08-26 00:00:00.0,32764.19061088562]
[2013-08-28 00:00:00.0,15508.830348968506]