Join in spark when there are duplicate keys in both RDDs

Hi everyone,
I am trying to join two RDDs,

First table contains data of employees and their respective departments with time they are in that department.
eg. 10001,development,to_time,from_time
10001,research,to_time,from_time
I made RDD from this data which contains emp_no ,dept_name.

second table contains information about employees and their titles during time period.
eg. 10001,staff,to_time,from_time
10001,senior staff,to_time, from_time

From this i made RDD which has emp_no,title

Now as you can see both RDD has multiple entries for same key.

So when i go to join these RDDs and try to print result , spark is freezing in between .
i was waiting for more than 15 mins but still no result and these datasets are not that large .

When i remove duplicate keys from both rdds i get result within a minute.

can some one explain to me what is issue when i try to join two RDDs with duplicate keys.??

If you have duplicates in join key for both RDDs or data sets in general, it will result in cartesian product. Cartesian product will explode data volume and hence performance will be poor.

@itversity I agree with you performance degrades w.r.to volume of the inputs. Well, we can go with cogroup() since @mahesh_mogal wants to proceed with duplicate keys without missing any of the source data.

1 Like

Where are you running this? On the lab?

Try launching spark-shell --master yarn

@mahesh_mogal
what is the expected result?
Is this the sample data you are assuming / a real world example?
From a DW perspective this looks like a classical case of Slowly Changing Dimension model (SCD1), where the transnational table contains only most accurate information about employee and the changes occurred in his employment ( in this case title change, department change) are tracked in other table where it contains duplicates.

Could you please post your expected results out of this join? May be i am not clear on this question…

Hi,
@email2dgk i tried with cogroup too. but it still freezes for long time without giving any result.

@itversity yes i am trying to run it on lab using pyspark. i will try spark-shell for it .

@ramesh.devarakonda
its sample data i picked it from mysql website.
problem statement which i made myself is – number of distinct senior engineers worked in development department over the years.
this is first sample data set.
(u’100000’, u’Research’)
(u’100001’, u’Research’)
(u’100001’, u’Development’)
(u’100002’, u’Research’)
(u’100003’, u’Development’)

second dataset

(u'100000', u'Senior Staff')
(u'100001', u'Engineer')
(u'100002', u'Senior Staff')
(u'100003', u'Engineer')
(u'100003', u'Senior Engineer')

now to remove duplicate keys i used reduceByKey()

d1 = deptNameEmpMap.reduceByKey(lambda x,y: x+","+y)

for both datasates and then join them
i got following result

(u'100000', (u'Senior Staff', u'Research'))
(u'100001', (u'Engineer', u'Research,Development'))
(u'100002', (u'Senior Staff', u'Research'))
(u'100003', (u'Engineer,Senior Engineer', u'Development'))
(u'100004', (u'Staff', u'Sales'))

now expected result is

(u’100000’, (u’Senior Staff’, u’Research’))
(u’100001’, (u’Engineer’, u’Development’))
(u’100001’, (u’Engineer’, u’Research’))
(u’100002’, (u’Senior Staff’, u’Research’))
(u’100003’, (u’Engineer, u’Development’))
(u’100003’, (u’Senior Engineer’, u’Development’))
(u’100004’, (u’Staff’, u’Sales’))

i do i get this result from result i have obtained.
i know i have to use flatmap() i tried it but still not able to get result as i want.

or there is other way to get desired result for my case.?

thanks for all of your replies.

@mahesh_mogal
Your final output as per your request: is “number of distinct senior engineers worked in development department” is==>
(u’100003’, (u’Senior Engineer’, u’Development’))

Your Problem to be solved: “number of distinct senior engineers worked in development department over the years.”

I am assuming you are trying to solve this problem with Python as the solution - i prefer you think from Database perspective…

Filter Dataset-1 with “Development”:

(u’100001’, u’Development’)
(u’100003’, u’Development’)

Filter Dataset-2 with “Senior Engineer”:

(u’100003’, u’Senior Engineer’)

Join Dataset1, Datase2 with Key - ID

(u’100003’, u’Development’)
(u’100003’, u’Senior Engineer’)

Now reduce by Key:

(u’100003’, u’Development’,‘Senior Engineer’ )

Let me know if this is what you are looking?

If yes, while doing joins- Filter early before join this will reduce performance issues in joining un-necessary data.

Here is some idea in scala context:

scala> val deptList = List( (100,“Research”), (101,“Research”), (101,“Development”),(102,“Research”), (103,“Development”))
deptList: List[(Int, String)] = List((100,Research), (101,Research), (101,Development), (102,Research), (103,Development))

scala> val desigList= List((100,“Senior Staff”), (101,“Engineer”), (102,“Senior Engineer”), (103,“Engineer”), (103,“Senior Engineer”))
desigList: List[(Int, String)] = List((100,Senior Staff), (101,Engineer), (102,Senior Engineer), (103,Engineer), (103,Senior Engineer))

scala> val deptList_Filtered = deptList.filter(_._2 == “Development”)
deptList_Filtered: List[(Int, String)] = List((101,Development), (103,Development))

scala> val desigList_Filtered = desigList.filter(_._2 == “Senior Engineer”)
desigList_Filtered: List[(Int, String)] = List((102,Senior Engineer), (103,Senior Engineer))

scala> val joinedMap = (deptList_Filtered ::: desigList_Filtered).toMap
joinedMap: scala.collection.immutable.Map[Int,String] = Map(101 -> Development, 103 -> Senior Engineer, 102 -> Senior Engineer)

scala> val filteredFinal = joinedMap.filter((t)=>t._2==“Senior Engineer” )
filteredFinal: scala.collection.immutable.Map[Int,String] = Map(103 -> Senior Engineer, 102 -> Senior Engineer)

scala> filteredFinal.toList.map(println)
(103,Senior Engineer)
(102,Senior Engineer)
res17: List[Unit] = List((), ())

The final output is:

(103,Senior Engineer)
(102,Senior Engineer)

If it is pyspark, just run pyspark --master yarn-client
By default it run in local mode.