Spark Core API's using Scala- Top N Priced Products

apache-spark
scala

#1

I am able to get the Products having top N prices but the other part of the exercise i.e Top N Priced Products is not working for me.

Inputs: Products file separated by pipes(|) and Categories by (,)

Here is my code:

val Products=sc.textFile("/user/cloudera/problem2/products/")
val Categories=sc.textFile("/user/cloudera/categoriesfinal")

val ProductsMap=Products.map(r=>{var d=r.split(’|’); (d(1).toInt,(d(2),d(4).toDouble))})
val CategoriesMap=Categories.map(r=>{var d=r.split(’,’); (d(0).toInt,d(2))})

val PCJoin=ProductsMap.join(CategoriesMap)

PCJoin.show()

val PCMap=PCJoin.map(r=>(r._2._2,r._2._1))

var PCMapGBK=PCMap.groupByKey()

def topresults(a: Iterable[(String, Double)],topN: Int): Iterable[(String, Double)]={
a.toList.sortBy(r => -r._2).take(topN)
}

var PCResult=PCMapGBK.map(r=>(r._1,topresults(r._2,2)))

Not working
def topdensedresults(a: Iterable[(String, Double)],topN: Int): Iterable[(String, Double)]={
** val temp = a.toList.sortBy(r => -r._2).distinct.take(topN)**
** a.toList.sortBy(r => -r._2).filter(r => temp.contains(r._2.toString))**
** }**

Kindly let me know if someone can find what I missed out here…
Thanks in advance.


#2

Try to escape the | in =r.split(’|’) as =r.split(’\|’).


#3

It should be r.split("\|")


#4

Unfortunately, its not working with both front and back slash as well:


#5

why don’t you try it double quote and just pipe symbol…


#6

This is the solution.

pipedata.map(rec => rec.split("\|"))


#7

pipedata.map(rec => rec.split("\|")).first()

it should be \| in quotes (double slashes) Here in this blog there is some issue in displaying it.


#8

So, you mean to say double slashes followed by pipe(everything in double quotes)?


#9

Finally This worked as suggested by @venkatwilliams

Thanks alot…


#10

Now, the main code is not working…its returning the empty list…


#11

@nitesh - Check you have filtered product id 685, it’s bad data.


#12

I have posted another query to solve the same using dataframe and spark sql