Problem 1 - Importing and Exporting Data with transformations


#1

Originally published at: http://www.itversity.com/lessons/problem-1-importing-and-exporting-data-with-transformations/

Let us see the first problem. In this we will import the data from mysql table, do little transformation and then export it back to mysql table Problem Statement Using sqoop, import orders table into hdfs to folders /user/cloudera/problem1/orders. File should be loaded as Avro File and use snappy compression Using sqoop, import order_items table into…


#2

Hi Arun,

Thanks! for the post.
I am looking for the python equivalent for answer 4c:
joinedOrderDataDF…map(lambda x : ((float(x[“order_date”]),str(x[“order_status”])) , (float(x[“order_item_subtotal”]),str(x[“order_id”])) )).combineByKey((lambda x : (x[0],set(x[1]))), (lambda x,y:(x[0]+y[0], x[1].add(y[1]))),(lambda x,y: (x[0]+y[0],x[1].add(y[1])))).take(11)

getting following error:
‘NoneType’ object has no attribute 'add’
seems pythons ‘add’ function equivalent + in scala for set does not assign to itself. Please let me know of work around.


#3

I am trying to use aggregateByKey instead of combineByKey.but getting issue with count rest all looking good…could you please let me know what i am missing here,

val ordersDF = sqlContext.read.avro("/user/imthiyas90/problem1/orders")
val orderitemDF = sqlContext.read.avro("/user/imthiyas90/problem1/order_items")

ordersDF.registerTempTable(“orders”)

orderitemDF.registerTempTable(“order_items”)

val joinedDF = ordersDF.join(orderitemDF, ordersDF(“order_id”) === orderitemDF(“order_item_order_id”))

val combine_agg = joinedDF.map(e => ((e(1).toString,e(3).toString),(e(8).toString.toDouble,e(0).toString.toInt))).aggregateByKey((0.0,0))((x:(Double,Int),y:(Double,Int)) =>(x._1+y._1,x._2+1),(x:(Double,Int),y:(Double,Int)) =>(x._1+y._1,x._2+y._2)).map(x => (x._1._1,x._1._2,x._2._1,x._2._2)).toDF.orderBy(col("_1").desc,col("_2"),col("_3").desc,col("_4"))