Joining dataframe error

spark-shell

#1

How to get correct result in dataframe?
Piyush · 41 minutes ago
Hi Durga,

Below is data set

[cloudera@quickstart ~]$ hadoop fs -cat /user/cloudera/practice4/question3/orders/part-m-00000 |head
1,2013-07-25 00:00:00.0,11599,CLOSED
2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
3,2013-07-25 00:00:00.0,12111,COMPLETE

[cloudera@quickstart ~]$ hadoop fs -cat /user/cloudera/practice4/question3/customers/part-m-00000 |head
1,Richard,Hernandez,XXXXXXXXX,XXXXXXXXX,6303 Heather Plaza,Brownsville,TX,78521
2,Mary,Barrett,XXXXXXXXX,XXXXXXXXX,9526 Noble Embers Ridge,Littleton,CO,80126
3,Ann,Smith,XXXXXXXXX,XXXXXXXXX,3422 Blue Pioneer Bend,Caguas,PR,00725

** Output requirement :customer_id,customer_fname,order_id and order_status for customers whose orders status is like “pending”

I tried with solution

case class customers(custId: Integer, name: String)
case class orders(OrdId:Integer,custId:Integer,Status:String)
val CusDF = sc.textFile("/user/cloudera/practice4/question3/customers/").map(x => x.split(",")).map(c => customers(c(0).toInt,c(1))).toDF
val ordDF = sc.textFile("/user/cloudera/practice4/question3/orders/").map(x => x.split(",")).map(o => orders(o(0).toInt, o(2).toInt,o(3))).toDF

val filDF = ordDF.join(cusDF,ordDF(“custId”)===CusDF(“custId”)).filter(“status like ‘%PENDING%’”)

**while in am joining it gives result of filDF with file columns shows five. How to get the final o/p with four required fileds ?

scala> val filDF= ordDF.join(CusDF,ordDF(“custId”)===CusDF(“custId”)).filter(“status like ‘%PENDING%’”)

filDF: org.apache.spark.sql.DataFrame = [OrdId: int, custId: int, Status: string, custId: int, name: string]

scala> filDF.show()
±----±-----±--------------±-----±--------+
|OrdId|custId| Status|custId| name|
±----±-----±--------------±-----±--------+
| 6517| 31|PENDING_PAYMENT| 31| Mary|
|31933| 31|PENDING_PAYMENT| 31| Mary|
|41081| 31|PENDING_PAYMENT| 31| Mary|
|14814| 231|PENDING_PAYMENT| 231| Mary|
|14039| 431| PENDING| 431| Michael|
|24266| 431| PENDING| 431| Michael|
|63408| 631| PENDING| 631| Mary|
|40285| 831|PENDING_PAYMENT| 831| Daniel|
| 1978| 1031|PENDING_PAYMENT| 1031|Alexander|
|17798| 1031|PENDING_PAYMENT| 1031|Alexander|
|24259| 1031|PENDING_PAYMENT| 1031|Alexander|
|32227| 1631|PENDING_PAYMENT| 1631| Mary|
|11650| 1831|PENDING_PAYMENT| 1831| Jacob|
|20628| 1831|PENDING_PAYMENT| 1831| Jacob|
|35299| 1831| PENDING| 1831| Jacob|
|40957| 1831| PENDING| 1831| Jacob|
|52163| 1831|PENDING_PAYMENT| 1831| Jacob|
| 5016| 2031|PENDING_PAYMENT| 2031| Mary|
|17404| 2031|PENDING_PAYMENT| 2031| Mary|
|67676| 2031|PENDING_PAYMENT| 2031| Mary|
±----±-----±--------------±-----±--------+
only showing top 20 rows

because of above five column in filter data frame my final result is wrong which i am trying to store.

filDF.rdd.saveAsTextFile("/user/cloudera/p1/q7/output")


#2

For avoiding duplicate columns, define the join columns as an array of strings
scala> val filDF= ordDF.join(CusDF, Seq(“custId”).filter(“status like ‘%PENDING%’”)
Please try this if it works.


#3

Looks like join is missing …can you provide correct syntax becasue i tried with below it is not working

val filDF= ordDF.join(cusDF,ordDF(“custId”)===cusDF(“custId”,Seq(“custId”)).filter(“status like ‘%PENDING%’”)


#4

I have run this command ,its working-
scala> val filDF= ordDF.join(CusDF,Seq(“custId”)).filter(“status like ‘%PENDING%’”)

filDF: org.apache.spark.sql.DataFrame = [custId: int, OrdId: int, Status: string, name: string]

The above command not generating duplicate field for custID.
Join is not missing only we are not using equality condition.


#5

Have you run in cloudera quickstart vm v 5.10.x ? ///because its not working for me


#6

I am running in cloudera- hadoop 2.6.0- cdh 5.8.0
Can you please show what error are you getting?