Spark RDD join fails

apache-spark

#1

I have three files as below:
EmployeeName.csv with the field (id, name)
E001,Emp1
E002,Emp2
E003,Emp3
E004,Emp4

EmployeeManager.csv (id, managerName)
E001,MGR1
E002,MGR2
E003,MGR3
E004,MGR4

Employeesalary.csv (id, Salary)
E001,25000
E002,45666
E003,45646
E004,85464

using Spark and its API you have to generate a joined output as below and save as a text file (Separated by comma) tor final distribution and output must be sorted by id.
Id,name,salary,managerName
val empRDD = sc.textFile("/user/kumsavarthami/employee/employe_name.csv")
empRDD.take(5).foreach(println)
val empPairRDD = empRDD.map(x=>(x.split(",")(0),x.split(",")(1)))
empPairRDD.take(5).foreach(println)
val mgrRDD = sc.textFile("/user/kumsavarthami/employee/manager.csv")
mgrRDD.take(5).foreach(println)
val mgrPairRDD = mgrRDD.map(x=>(x.split(",")(0),x.split(",")(1)))
mgrPairRDD.take(5).foreach(println)
val salRDD = sc.textFile("/user/kumsavarthami/employee/salary.csv")
salRDD.take(5).foreach(println)
val salPairRDD = salRDD.map(x=>(x.split(",")(0),x.split(",")(1)))
salPairRDD.take(5).foreach(println)
val joinRDD = empPairRDD.join(mgrPairRDD).join(salPairRDD)
joinRDD.take(5).foreach(println)
val joinedData = joinRDD.sortByKey()
empPairRDD.take(5).foreach(println)

Issue:
joining as below
val joinRDD = empPairRDD.join(mgrPairRDD).join(salPairRDD)
take(5) gives error “Caused by: java.lang.ArrayIndexOutOfBoundsException: 1”
joinRDD.take(5).foreach(println)


#2

Can someone help to resolve this.