Need to split the join output

apache-spark

#1

Dear all,

I have two tables (three columns each), did a Spark Join and got an Paired RDD, now I have to break up the RDD and extract some specific values out of it.

Input

customer

101,James,4
101,James,4
102,Sam,4
103,Shane,4
104,Sam,4

sales

4,hloan1,1500000.0
4,hloan1,1500000.0
4,hloan1,1500000.0
4,hloan1,1500000.0
4,hloan1,1500000.0

code

val customers = sc.textFile("/user/xxx/customer.txt");
val sales = sc.textFile("/user/xxx/sales.txt");
case class Customer(cid:Int, cname:String, clid:Int);
case class Sales(clid:Int, lname:String, amt:Double);
val custMap = customers.map( r => r.split(",")).map(rec => (rec(2), Customer(rec(0).toInt,rec(1),rec(2).toInt));
val salesMap = sales.map( r => r.split(",")).map(rec => (rec(0),Sales(rec(0).toInt,rec(1).toInt,rec(2).toDouble)));
val custsaleJoin = custMap.join(salesMap)

This is what the output looks like::::

Scala> custsaleJoin .take(5).foreach(println)
(4,(Customer(101,James,4),Sale(4,hloan1,1500000.0)))
(4,(Customer(101,James,4),Sale(4,hloan1,1500000.0)))
(4,(Customer(102,Sam,4),Sale(4,hloan1,1500000.0)))
(4,(Customer(103,Shane,4),Sale(4,hloan1,1500000.0)))
(4,(Customer(104,Sam,4),Sale(4,hloan1,1500000.0)))


Now I have to extract the 1st and 2nd column values(101,James) from Customer and last value from Sale(1500000.0….) and combine them as
101,James,1500000.0
102,Sam,1500000.0

I need to get in core spark only not in DataFrame

Regards
Venkatesh


#2

Can you explain your expected output? Looks like your input data is not appropriate to demonstrate what you are looking for… Or I am not understanding your problem. Since your salesid 4 in all records of customer which means a cartesian product of all sales to your output…

You can try this:

val cust = sc.textFile("/user/kannanvijay/practice20/input/customer.txt")
val sal = sc.textFile("/user/kannanvijay/practice20/input/sales.txt")

val custmap = cust.map(rec => {
val c = rec.split(’,’);
(c(2).toInt,(c(0).toInt, c(1).toString, c(2).toInt))
})

val salmap = sal.map(rec => {
val s = rec.split(’,’);
(s(0).toInt,(s(0).toInt, s(1).toString, s(2).toDouble))
})

val custJoinSal = custmap.join(salmap)

//Results will be like this (4, ((101,James,4),(4,hloan1,1500000.0)))

val res = custJoinSal.map(rec => {
(rec._2._1._1, rec._2._1._2, rec._2._2._3)
}).distinct

res.take(10).foreach(println)