How to add the headings before the data is placed in hdfs after the processing is done in pyspark


#1

HI All,

Is there any way to put the header also in the file after the processing is completed in spark engine.

For example I have to find the group of employees for departments. So I will use spark to find the same. Now I have to place the file back in HDFS.

When I am putting the file back, I wish to add headings like ‘department_name’,‘employee_name’ before the employee data.

department_name,employee_name
12,abc
12,xyx
13,rtg
14,fde

Regards,
Vibhor.


#2

@vibhoroffice While saving a file you can use header option to add the header to file

df.write.format(“csv”).option(“header”, “true”).save(“hdfs://location/to/save/csv”)


#3

@annapurna, when I am using approach mentioned above, I am not getting the desired result.

I am giving the code below, can you please let me know where I am going wrong. The code is working perfectly fine apart from the last line where I am trying to save back by using dataframe. Please help me.

#create the RDD out of the file
technologyrdd=sc.textFile("/user/vibhoroffice/problem45/technology.txt")
salaryrdd=sc.textFile("/user/vibhoroffice/problem45/salary.txt")
#find the header and remove the same.
header1=technologyrdd.first()
header2=salaryrdd.first()
technologyrddfilter=technologyrdd.filter(lambda x:x!=header1)
salaryrddfilter=salaryrdd.filter(lambda x:x!=header2)
#map the data
from pyspark.sql import Row
technologymap=technologyrddfilter.map(lambda x:(Row(first_name=x.split(",")[0],last_name=x.split(",")[1],technology=x.split(",")[2]))).toDF()
salarymap=salaryrddfilter.map(lambda x:(Row(first_name=x.split(",")[0],last_name=x.split(",")[1],salary=x.split(",")[2]))).toDF()
cond=[technologymap.first_name==salarymap.first_name,technologymap.last_name==salarymap.last_name]
datajoin=technologymap.join(salarymap,cond,‘outer’).select(technologymap.first_name,technologymap.last_name,technologymap.technology,salarymap.salary).collect()
datajoin.write.format(“csv”).save("/user/vibhoroffice/problem45/result")