Regarding pyspark def() function error

apache-spark

#1

Hai,
I am getting the following error given in the screenshot attached below when i execute the following code:

def getTopDenseN(rec, topN):
x = [ ]
topNPrices = [ ]
prodPrices = [ ]
prodPricesDesc = [ ]
for i in rec[1]:
prodPrices.append(float(i.split(",")[4]))
prodPricesDesc = list(sorted(set(prodPrices), reverse=True))
import itertools
topNPrices = list(itertools.islice(prodPricesDesc, 0, topN))
for j in sorted(rec[1], key=lambda k: float(k.split(",")[4]), reverse=True):
if(float(j.split(",")[4]) in topNPrices):
x.append(j)
return (y for y in x)

for i in productsMap.groupByKey().flatMap(lambda x: getTopDenseN(x, 2)).collect(): print(i)

Can u tell me what the error is?


#2

It seems that, there is data type mismatch that is clearly says in error. Can you check that your RDD’s are created properly with correct data types?


#3

@ravi.tejarockon Are you telling me to check the data types of elements in the products table?


#4

Not table datatypes, RDD data types.