Migrate code from PYSPARK to AWS GLUE

pyspark
apache-spark

#1

Hi BigData Gurus.

Can someone please help me to know how can I migrate below pyspark functions, queueFiles() and loadprisma() (created in HDP VM) to AWS Glue?
I need this code to be able to run in a AWS Glue job, I would like to know:
-Which libraries I need to set up in aws glue job
-How can I run my code in glue as I do in spark from HDP Virtual Machine.

In my hdp vm I am running pyspark with the following dependencies:

pyspark --driver-class-path /root/mysql-driver/mysql-connector-java-5.1.45-bin.jar --packages com.databricks:spark-csv_2.10:1.0.3

Thank you in advance.

def queueFiles(processNM,filelocation):
import csv
#Import Objects
#processNM = “sizmek”
#filelocation = “file:///root/webcosts/workdir/sizmek/“
from pyspark.sql.functions import lower, col
#Establsh connection to DB and get tables
mysql_url_webcosts=“jdbc:mysql://hostname/dbname"
sqlContext.read.format(“jdbc”).option(“url”, mysql_url_webcosts).option(“driver”, “com.mysql.jdbc.Driver”).option(“dbtable”, “etl_source_file_assignments”).option(“user”, “user”).option(“password”,“password”).load().createOrReplaceTempView(“etl_source_file_assignments_table”)
sqlContext.read.format(“jdbc”).option(“url”, mysql_url_webcosts).option(“driver”, “com.mysql.jdbc.Driver”).option(“dbtable”, “vendors”).option(“user”, “user”).option(“password”, “password”).load().createOrReplaceTempView(“vendors_table”)
sqlContext.read.format(“jdbc”).option(“url”, mysql_url_webcosts).option(“driver”, “com.mysql.jdbc.Driver”).option(“dbtable”, “vendor_systems”).option(“user”, “user”).option(“password”, “password”).load().createOrReplaceTempView(“vendor_systems_table”)
sqlContext.read.format(“jdbc”).option(“url”, mysql_url_webcosts).option(“driver”, “com.mysql.jdbc.Driver”).option(“dbtable”, “data_sources”).option(“user”, “user”).option(“password”, “password”).load().createOrReplaceTempView(“data_sources_table”)
etl_source_file_assignments_df = sqlContext.sql(“select esfa.ETL_FILE_ASSIGNMENT_ID, esfa.SYSTEM_ID, vs.SYSTEM_NAME, esfa.DATA_SOURCE_ID, ds.DATA_SOURCE_NM, esfa.VENDOR_ID, v.VENDOR_NM, esfa.FILENAME_PREFIX, esfa.ACTIVE from etl_source_file_assignments_table esfa LEFT JOIN vendors_table v ON esfa.VENDOR_ID = v.VENDOR_ID LEFT JOIN vendor_systems_table vs ON esfa.SYSTEM_ID = vs.SYSTEM_ID left join data_sources_table ds ON esfa.DATA_SOURCE_ID = ds.DATA_SOURCE_ID”).cache()
#Create RDD from source file
file = sc.wholeTextFiles(filelocation)
##Map the file name
if processNM == “sizmek”:
print(“Starting Sizmek Process…”)
file_name_df = file.map(lambda x: x[0]).map(lambda d: d[35:200]).map(lambda f: (f[12:17],f)).map(lambda d: (d[0],d[1])).toDF()
#file_name_df = file.map(lambda x: x[0]).map(lambda d: d[35:200]).map(lambda f: (f[ f.find(’[’,0,120) + 1: f.find(’[’,0,120) + 6 ],f)).map(lambda d: (d[0],d[1])).toDF()
if processNM == “dcm”:
print(“Starting DCM Process…”)
file_name_df = file.map(lambda s: s[0]).map(lambda d: d[32:200]).map(lambda f: (f[0:13],f)).map(lambda d: (d[0],d[1])).toDF()
if processNM == “prisma”:
print(“Starting PRISMA Process…”)
file_name_df = file.map(lambda s: s[0]).map(lambda d: d[35:200]).map(lambda f: (‘prisma’,f)).map(lambda d: (d[0],d[1])).toDF()
if processNM == “prismam”:
print(“Starting PRISMA Monthly Process…”)
file_name_df = file.map(lambda s: s[0]).map(lambda d: d[36:200]).map(lambda f: (‘prisma’,f)).map(lambda d: (d[0],d[1])).toDF()
if processNM == “digitas”:
print(“Starting DIGITAS Process…”)
file_name_df = file.map(lambda s: s[0]).map(lambda d: d[36:200]).map(lambda f: (f[0:4],f)).map(lambda d: (d[0],d[1])).toDF()
if processNM == “havas”:
print(“Starting HAVAS Process…”)
#Lowercase the file name
file_name_rdd = file_name_df.select(lower(col(’_1’)).alias(“sname”),col(’_2’).alias(“lname”)).rdd.map(lambda x: (str(x[0]), str(x[1])) )
etl_source_file_assignments_df_rdd = etl_source_file_assignments_df.rdd.map(lambda x: (x[0],x[1],x[2],x[3],str(x[4]),x[5],x[6],x[7],x[8]))
#Create list to store file information
new_list = []
for row in file_name_rdd.collect():
output = etl_source_file_assignments_df_rdd.filter(lambda x: row[0] in x[7])
for i in output.collect():
file_line = sc.textFile(filelocation + row[1]).filter(lambda x: ‘supplier_code’ not in x).take(1)
sniffer = csv.Sniffer()
sniff_delim = sniffer.sniff(str(file_line))
if sniff_delim.delimiter == ’ ’ or sc.parallelize(file_line).map(lambda x: ‘\t’ if ‘\t’ in x[0:20] else ‘other’).take(1) == [’\t’]:
delim_var = “t"
elif sniff_delim.delimiter == ‘,’ or sc.parallelize(file_line).map(lambda x: ‘,’ if ‘,’ in x[0:20] else ‘other’).take(1) == [’,’]:
delim_var = “c"
else:
delim_var = “p"
print(“0” + “,” + str(i[1]) + “,” + i[2] + “,” + str(i[3]) + “,” + i[4] + “,” + str(i[5]) + “,” + str(i[6]) + “,” + filelocation + “,” + row[1] + “,” + “0” + “,” + “” + “,” + “” + “,” + “0” + “,” + processNM + “,” + delim_var)
new_list.append(“0” + “,” + str(i[1]) + “,” + i[2] + “,” + str(i[3]) + “,” + i[4] + “,” + str(i[5]) + “,” + str(i[6]) + “,” + filelocation + “,” + row[1] + “,” + “0” + “,” + “” + “,” + “” + “,” + “0” + “,” + processNM + “,” + delim_var)
dataout = sc.parallelize(new_list)
dataoutrdd = dataout.map(lambda d: (int(d.split(”,”)[0]),int(d.split(”,”)[1]),d.split(”,”)[2],int(d.split(",")[3]),d.split(",")[4],int(d.split(",")[5]),d.split(",")[6],d.split(",")[7],d.split(",")[8],int(d.split(",")[9]),d.split(",")[10],d.split(",")[11],int(d.split(",")[12]),d.split(",")[13],str(d.split(",")[14])))
#Create dataframe of the file
dataoutdf = spark.createDataFrame(dataoutrdd, [‘DATA_SOURCE_FILE_ID’, ‘SYSTEM_ID’, ‘SYSTEM_NAME’, ‘DATA_SOURCE_ID’, ‘DATA_SOURCE_NM’, ‘VENDOR_ID’, ‘VENDOR_NM’, ‘FILEPATH’, ‘FILENAME’, ‘FILESIZE’, ‘FILE_LOAD_START’, ‘FILE_LOAD_END’, ‘TRANSFORMED’, ‘PROCESS_NAME’, ‘DELIMITER’])
#Save file information in DB
mysql_url="jdbc:mysql://hostname/dbname?user=user&password=password"
dataoutdf.write.jdbc(mysql_url,table=“data_source_files”,mode=“append”)

def loadprisma():
from pyspark.sql import functions as F
mysql_url_webcosts=“jdbc:mysql://hostname/dbname"
sqlContext.read.format(“jdbc”).option(“url”, mysql_url_webcosts).option(“driver”, “com.mysql.jdbc.Driver”).option(“dbtable”, “data_source_files”).option(“user”, “user”).option(“password”,“password”).load().createOrReplaceTempView(“data_source_files_table”)
loadfiles = sqlContext.sql(“select * from data_source_files_table where SYSTEM_NAME = ‘Prisma’ and transformed = 0 and (DATA_SOURCE_FILE_ID >= 1 and DATA_SOURCE_FILE_ID <= 150)”).cache()
first_file_id = loadfiles.agg(F.min(loadfiles.DATA_SOURCE_FILE_ID)).rdd.map(lambda x: x[0]).take(1)
#loadfiles.show()
full_rdd = None
#data_source_file_id, system_id, system_nm, data_source_id, data_source_nm, vendor_id, vendor_nm, filepath, filename, process
for l in loadfiles.rdd.map(lambda x: (x[0],str(x[13]),x[1],str(x[2]),x[3],str(x[4]),x[5],str(x[6]),str(x[7]),str(x[8]),x[9],str(x[10]),str(x[11]),str(x[12]),x[14],x[0],x[0])).collect():
print(l[0],l[1],l[2],l[3],l[4],l[5],l[6],l[7],l[8],l[9],l[13],l[14],l[15],l[16])
#print(l[7] + l[8])
#delim_var = ‘‘
delim_flag = ‘’
#print("delim in-> " + str(l[14]))
if l[14] == “t”: #[’\t’]
delim_var_t = “\t”
#print(“Enter tab”)
data_source_t = str(l[0])
new_list_t = sc.textFile(l[8] + l[9]).map(lambda x: x.split(”\t")).filter(lambda x: x[0] != ‘supplier_code’).filter(lambda x: x[37] != ‘’).filter(lambda x: x[37] != ‘0’).map(lambda x: (data_source_t,l[1],l[2],l[3],l[4],l[5],l[6],l[7],x[0],x[1],x[2],x[3],x[4],x[5],x[6],x[7],x[8],x[9],x[10],x[11],x[12],x[13],x[14],x[15],x[16],x[17],x[18],x[19],x[20],x[21],x[22],x[23],x[24],x[25],x[26],x[27],x[28],x[29],x[30],x[31],x[32],x[33],x[34],x[35],x[36],x[37],x[38],x[39],x[40],x[41],x[42],x[43],x[44],x[45],x[46],x[47],x[48],x[49],x[50],x[51],x[52],x[53],x[54],x[55],x[56],x[57],x[58],x[59],x[60],x[61],x[62],x[63],x[64],x[65],x[66],x[67],x[68],x[69],x[70],x[71],x[72],x[73],x[74],x[75],x[76],x[77],x[78],x[79],x[80],x[81],x[82],x[83],x[84],x[85],x[86],x[87],x[88]))
delim_flag = 'delim_tab’
elif l[14] == “c”:
delim_var_c = ‘,’
#print(“Enter comma”)
data_source_c = str(l[15])
new_list_c = sc.textFile(l[8] + l[9]).map(lambda x: x.split(",")).filter(lambda x: x[0] != ‘supplier_code’).filter(lambda x: x[37] != ‘’).filter(lambda x: x[37] != ‘0’).map(lambda x: (data_source_c,l[1],l[2],l[3],l[4],l[5],l[6],l[7],x[0],x[1],x[2],x[3],x[4],x[5],x[6],x[7],x[8],x[9],x[10],x[11],x[12],x[13],x[14],x[15],x[16],x[17],x[18],x[19],x[20],x[21],x[22],x[23],x[24],x[25],x[26],x[27],x[28],x[29],x[30],x[31],x[32],x[33],x[34],x[35],x[36],x[37],x[38],x[39],x[40],x[41],x[42],x[43],x[44],x[45],x[46],x[47],x[48],x[49],x[50],x[51],x[52],x[53],x[54],x[55],x[56],x[57],x[58],x[59],x[60],x[61],x[62],x[63],x[64],x[65],x[66],x[67],x[68],x[69],x[70],x[71],x[72],x[73],x[74],x[75],x[76],x[77],x[78],x[79],x[80],x[81],x[82],x[83],x[84],x[85],x[86],x[87],x[88]))
delim_flag = 'delim_com’
else:
delim_var_p = “|”
#print(“Enter pipe”)
data_source_p = str(l[16])
new_list_p = sc.textFile(l[8] + l[9]).map(lambda x: x.split("|")).filter(lambda x: x[0] != ‘supplier_code’).filter(lambda x: x[37] != ‘’).filter(lambda x: x[37] != ‘0’).map(lambda x: (data_source_p,l[1],l[2],l[3],l[4],l[5],l[6],l[7],x[0],x[1],x[2],x[3],x[4],x[5],x[6],x[7],x[8],x[9],x[10],x[11],x[12],x[13],x[14],x[15],x[16],x[17],x[18],x[19],x[20],x[21],x[22],x[23],x[24],x[25],x[26],x[27],x[28],x[29],x[30],x[31],x[32],x[33],x[34],x[35],x[36],x[37],x[38],x[39],x[40],x[41],x[42],x[43],x[44],x[45],x[46],x[47],x[48],x[49],x[50],x[51],x[52],x[53],x[54],x[55],x[56],x[57],x[58],x[59],x[60],x[61],x[62],x[63],x[64],x[65],x[66],x[67],x[68],x[69],x[70],x[71],x[72],x[73],x[74],x[75],x[76],x[77],x[78],x[79],x[80],x[81],x[82],x[83],x[84],x[85],x[86],x[87],x[88]))
delim_flag = ‘delim_pip’
#print(“delim out-> " + str(l[14]))
if full_rdd is None:
if delim_flag == ‘delim_tab’:
full_rdd = new_list_t.map(lambda x: (str(first_file_id[0]),x[1],x[2],x[3],x[4],x[5],x[6],x[7],x[8],x[9],x[10],x[11],x[12],x[13],x[14],x[15],x[16],x[17],x[18],x[19],x[20],x[21],x[22],x[23],x[24],x[25],x[26],x[27],x[28],x[29],x[30],x[31],x[32],x[33],x[34],x[35],x[36],x[37],x[38],x[39],x[40],x[41],x[42],x[43],x[44],x[45],x[46],x[47],x[48],x[49],x[50],x[51],x[52],x[53],x[54],x[55],x[56],x[57],x[58],x[59],x[60],x[61],x[62],x[63],x[64],x[65],x[66],x[67],x[68],x[69],x[70],x[71],x[72],x[73],x[74],x[75],x[76],x[77],x[78],x[79],x[80],x[81],x[82],x[83],x[84],x[85],x[86],x[87],x[88],x[89],x[90],x[91],x[92],x[93],x[94],x[95],x[96]))
elif delim_flag == ‘delim_com’:
full_rdd = new_list_c.map(lambda x: (str(first_file_id[0]),x[1],x[2],x[3],x[4],x[5],x[6],x[7],x[8],x[9],x[10],x[11],x[12],x[13],x[14],x[15],x[16],x[17],x[18],x[19],x[20],x[21],x[22],x[23],x[24],x[25],x[26],x[27],x[28],x[29],x[30],x[31],x[32],x[33],x[34],x[35],x[36],x[37],x[38],x[39],x[40],x[41],x[42],x[43],x[44],x[45],x[46],x[47],x[48],x[49],x[50],x[51],x[52],x[53],x[54],x[55],x[56],x[57],x[58],x[59],x[60],x[61],x[62],x[63],x[64],x[65],x[66],x[67],x[68],x[69],x[70],x[71],x[72],x[73],x[74],x[75],x[76],x[77],x[78],x[79],x[80],x[81],x[82],x[83],x[84],x[85],x[86],x[87],x[88],x[89],x[90],x[91],x[92],x[93],x[94],x[95],x[96]))
else:
full_rdd = new_list_p.map(lambda x: (str(first_file_id[0]),x[1],x[2],x[3],x[4],x[5],x[6],x[7],x[8],x[9],x[10],x[11],x[12],x[13],x[14],x[15],x[16],x[17],x[18],x[19],x[20],x[21],x[22],x[23],x[24],x[25],x[26],x[27],x[28],x[29],x[30],x[31],x[32],x[33],x[34],x[35],x[36],x[37],x[38],x[39],x[40],x[41],x[42],x[43],x[44],x[45],x[46],x[47],x[48],x[49],x[50],x[51],x[52],x[53],x[54],x[55],x[56],x[57],x[58],x[59],x[60],x[61],x[62],x[63],x[64],x[65],x[66],x[67],x[68],x[69],x[70],x[71],x[72],x[73],x[74],x[75],x[76],x[77],x[78],x[79],x[80],x[81],x[82],x[83],x[84],x[85],x[86],x[87],x[88],x[89],x[90],x[91],x[92],x[93],x[94],x[95],x[96]))
else:
if delim_flag == ‘delim_tab’:
full_rdd = sc.union([full_rdd, new_list_t])
elif delim_flag == ‘delim_com’:
full_rdd = sc.union([full_rdd, new_list_c])
else:
full_rdd = sc.union([full_rdd, new_list_p])
# This will force the lazy evaluation to execute now before l changes
full_rdd.count()
#print(full_rdd.map(lambda x: x[0]).distinct().collect())
full_rdd.map(lambda x: (str(x[0]) + “\t” + str(x[2]) + “\t” + str(x[3]) + “\t” + str(x[4]) + “\t” + str(x[5]) + “\t” + str(x[6]) + “\t” + str(x[7]) + “\t” + x[8] + “\t” + x[9] + “\t” + str(x[10]) + “\t” + x[11] + “\t” + x[12] + “\t” + x[13] + “\t” + x[14] + “\t” + x[15] + “\t” + x[16] + “\t” + x[17] + “\t” + x[18] + “\t” + x[19] + “\t” + x[20].encode(‘utf-8’) + “\t” + x[21] + “\t” + x[22] + “\t” + x[23] + “\t” + x[24] + “\t” + x[25] + “\t” + x[26] + “\t” + x[27] + “\t” + x[28] + “\t” + x[29] + “\t” + x[30] + “\t” + x[31] + “\t” + “” + “\t” + x[32] + “\t” + x[33] + “\t” + x[34] + “\t” + x[35] + “\t” + x[36] + “\t” + x[37] + “\t” + x[38] + “\t” + x[39] + “\t” + x[40] + “\t” + x[41] + “\t” + x[42] + “\t” + x[43] + “\t” + x[44] + “\t” + x[46] + “\t” + x[47] + “\t” + x[48] + “\t” + x[49] + “\t” + x[50] + “\t” + “” + “\t” + x[51] + “\t” + x[52] + “\t” + x[53] + “\t” + x[54] + “\t” + x[55] + “\t” + x[56] + “\t” + x[57] + “\t” + x[58] + “\t” + x[59] + “\t” + x[60] + “\t” + x[61] + “\t” + x[62] + “\t” + x[63] + “\t” + x[64] + “\t” + x[65] + “\t” + x[66] + “\t” + x[67] + “\t” + x[68] + “\t” + x[69] + “\t” + “” + “\t” + x[70] + “\t” + x[71] + “\t” + x[72] + “\t” + x[73] + “\t” + x[74] + “\t” + x[75] + “\t” + x[76] + “\t” + x[77] + “\t” + x[78] + “\t” + x[79] + “\t” + x[80] + “\t” + x[81] + “\t” + x[82] + “\t” + x[83] + “\t” + “” + “\t” + x[84] + “\t” + x[85] + “\t” + x[86] + “\t” + x[87] + “\t” + x[88] + “\t” + x[89] + “\t” + x[90] + “\t” + x[91] + “\t” + x[92] + “\t” + x[93] + “\t” + x[94] + “\t” + “” + “\t” + x[95] + “\t” + “” + “\t” + x[96] + “\t” + “” + “\t” + “” + “\t” + “” + “\t” + “” + “\t” + “” + “\t” + “” + “\t” + “” + “\t” + “” + “\t” + “” + “\t” + x[45] + “\t” + “0.0” + “\t” + “0.0”)).saveAsTextFile(“file:///root/webcosts/workdir/output/prisma_details_part1”) #.saveAsTextFile(”/root/output/")