Exercise 17 - Get topN (Dense or Spark)

Problem

• Get products data set
• Make sure product with id 685 is filtered out
• Get topN priced products (either using sparse rank or dense rank)
• Hint: Use groupByKey and explicit function

• Function logic
• Code snippet
• Sample output

#Sparse function
def getTopSparseN(rec:(String,Iterable[String]),topN:Int):Iterable[String] ={
val sortedRDDs=rec._2.toList.sortBy(k=> -k.split(",")(4).toFloat)
return sortedRDDs.take(topN)
}

#Code Snippet
val products=sc.textFile("/user/nagellarajashyam/sqoop_import/products")

``````val filteredProducts=products.filter(rec=>rec.split(",")(4).length>0)

val groupedProducts=filteredProducts.map(rec=>(rec.split(",")(1),rec))

val groupedRDD=groupedProducts.groupByKey()

val sortedRDD=groupedRDD.flatMap(rec=>getTopSparseN(rec,4))
``````

#Sample output
60,4,SOLE E25 Elliptical,999.99,http://images.acmesports.sports/SOLE+E25+Elliptical
162,8,YETI Tundra 65 Chest Cooler,399.99,http://images.acmesports.sports/YETI+Tundra+65+Chest+Cooler
153,8,Teeter Hang Ups NXT-S Inversion Table,299.99,http://images.acmesports.sports/Teeter+Hang+Ups+NXT-S+Inversion+Table
150,8,Quest 12â€™ x 12â€™ Dome Canopy,149.99,http://images.acmesports.sports/Quest+12â€™+x+12â€™+Dome+Canopy
1279,57,PUMA Menâ€™s evoPOWER 1 Tricks FG Soccer Cleat,189.99,http://images.acmesports.sports/PUMA+Menâ€™s+evoPOWER+1+Tricks+FG+Soccer+Cleat
1297,57,PUMA Menâ€™s evoSPEED 1.2 Tricks FG Soccer Clea,174.99,http://images.acmesports.sports/PUMA+Menâ€™s+evoSPEED+1.2+Tricks+FG+Soccer+Cleat
450,20,Garmin Forerunner 220 GPS Watch,249.99,http://images.acmesports.sports/Garmin+Forerunner+220+GPS+Watch
452,20,Garmin vivofit Fitness Band with HRM,169.99,http://images.acmesports.sports/Garmin+vivofit+Fitness+Band+with+HRM
437,20,ASICS Womenâ€™s GEL-Noosa Tri 9 Running Shoe,139.99,http://images.acmesports.sports/ASICS+Womenâ€™s+GEL-Noosa+Tri+9+Running+Shoe
451,20,Garmin Womenâ€™s Forerunner 10 GPS Watch,129.99,http://images.acmesports.sports/Garmin+Womenâ€™s+Forerunner+10+GPS+Watch
407,19,PUMA Menâ€™s evoPOWER 1 Tricks FG Soccer Cleat,189.99,http://images.acmesports.sports/PUMA+Menâ€™s+evoPOWER+1+Tricks+FG+Soccer+Cleat
425,19,Nike Menâ€™s Air Max 2014 Running Shoe,149.99,http://images.acmesports.sports/Nike+Menâ€™s+Air+Max+2014+Running+Shoe
417,19,Under Armour Menâ€™s Highlight MC Alter Ego Fla,139.99,http://images.acmesports.sports/Under+Armour+Menâ€™s+Highlight+MC+Alter+Ego+Flash+Footballâ€¦
424,19,Under Armour Menâ€™s Highlight MC Alter Ego Hul,139.99,http://images.acmesports.sports/Under+Armour+Menâ€™s+Highlight+MC+Alter+Ego+Hulk+Footballâ€¦
320,15,Manduka PRO Yoga Mat,100.0,http://images.acmesports.sports/Manduka+PRO+Yoga+Mat
322,15,Nike Womenâ€™s Studio Wrap Three-Part Pack,81.99,http://images.acmesports.sports/Nike+Womenâ€™s+Studio+Wrap+Three-Part+Pack
325,15,lucy Womenâ€™s Hatha Capri Leggings,79.0,http://images.acmesports.sports/lucy+Womenâ€™s+Hatha+Capri+Leggings
316,15,Nike Womenâ€™s Legend Regular Capris 2.0,65.0,http://images.acmesports.sports/Nike+Womenâ€™s+Legend+Regular+Capris+2.0

Function logic
def getTopDenseN(rec: (String, Iterable[String]), topN: Int): Iterable[String] = {
var prodPrices: List[Float] = List()
var topNPrices: List[Float] = List()
var sortedRecs: List[String] = List()
for(i <- rec._2) {
prodPrices = prodPrices:+ i.split(",")(4).toFloat
}
topNPrices = prodPrices.distinct.sortBy(k => -k).take(topN)
sortedRecs = rec._2.toList.sortBy(k => -k.split(",")(4).toFloat)
var x: List[String] = List()
for(i <- sortedRecs) {
if(topNPrices.contains(i.split(",")(4).toFloat))
x = x:+ i
}
return x
}

Code snippet:
val products = sc.textFile("/public/retail_db/products").filter(rec => !rec.split(",")(0).equals(685))

val productsMap = products.map(rec => (rec.split(",")(1), rec))
val denserank = productsMap.groupByKey().flatMap(x => getTopDenseN(x, 2))
denserank.take(10).foreach(println)
Sample output:
60,4,SOLE E25 Elliptical,999.99,http://images.acmesports.sports/SOLE+E25+Elliptical
162,8,YETI Tundra 65 Chest Cooler,399.99,http://images.acmesports.sports/YETI+Tundra+65+Chest+Cooler
153,8,Teeter Hang Ups NXT-S Inversion Table,299.99,http://images.acmesports.sports/Teeter+Hang+Ups+NXT-S+Inversion+Table
1279,57,PUMA Menâ€™s evoPOWER 1 Tricks FG Soccer Cleat,189.99,http://images.acmesports.sports/PUMA+Menâ€™s+evoPOWER+1+Tricks+FG+Soccer+Cleat
1297,57,PUMA Menâ€™s evoSPEED 1.2 Tricks FG Soccer Clea,174.99,http://images.acmesports.sports/PUMA+Menâ€™s+evoSPEED+1.2+Tricks+FG+Soccer+Cleat
450,20,Garmin Forerunner 220 GPS Watch,249.99,http://images.acmesports.sports/Garmin+Forerunner+220+GPS+Watch
452,20,Garmin vivofit Fitness Band with HRM,169.99,http://images.acmesports.sports/Garmin+vivofit+Fitness+Band+with+HRM
407,19,PUMA Menâ€™s evoPOWER 1 Tricks FG Soccer Cleat,189.99,http://images.acmesports.sports/PUMA+Menâ€™s+evoPOWER+1+Tricks+FG+Soccer+Cleat
425,19,Nike Menâ€™s Air Max 2014 Running Shoe,149.99,http://images.acmesports.sports/Nike+Menâ€™s+Air+Max+2014+Running+Shoe

val productsrdd=sc.textFile("/public/retail_db/products")

val productid=productsrdd.filter(!_.split(",")(0).equals(â€ś685â€ť))
val productsMap= productsrdd.map(rec=>(rec.split(",")(1),rec))
(2,1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,59.98,http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy)
(2,2,2,Under Armour Menâ€™s Highlight MC Football Clea,129.99,http://images.acmesports.sports/Under+Armour+Menâ€™s+Highlight+MC+Football+Cleat)

val productGroupby=productsMap.groupByKey()
(4,CompactBuffer(49,4,Diamondback Adult Sorrento Mountain Bike 2014,299.98,http://images.acmesports.sports/Diamondback+Adult+Sorrento+Mountain+Bike+2014, 50,4,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,59.98,http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy, 51,4,MAC Sports Collapsible Wagon,69.99,http://images.acmesports.sports/MAC+Sports+Collapsible+Wagon, 52,4,Easton Mako Youth Bat 2014 (-11),249.97,http://images.acmesports.sports/Easton+Mako+Youth+Bat+2014+(-11), 53,4,adidas Brazuca 2014 Top Glider Soccer Ball,29.99,http://images.acmesports.sports/adidas+Brazuca+2014+Top+Glider+Soccer+Ball, 54,4,Nike+ Fuelband SE,99.0,http://images.acmesports.sports/Nike%2B+Fuelband+SE, 55,4,adidas Brazuca 2014 Top RepliquĂ© Soccer Ball,39.99,http://images.acmesports.sports/adidas+Brazuca+2014+Top+RepliquĂ©+Soccer+Ball, 56,4,Fitbit Flex Wireless Activity & Sleep Wristba,99.95,http://images.acmesports.sports/Fitbit+Flex+Wireless+Activity+%26+Sleep+Wristband, 57,4,â€śNike Womenâ€™s Pro Core 3"â€ť Compression Shorts",28.0,http://images.acmesports.sports/Nike+Womenâ€™s+Pro+Core+3"+Compression+Shorts, 58,4,Diamondback Boysâ€™ Insight 24 Performance Hybr,299.99,http://images.acmesports.sports/Diamondback+Boysâ€™+Insight+24+Performance+Hybrid+Bike+2014, 59,4,adidas Brazuca 2014 Official Match Ball,159.99,http://images.acmesports.sports/adidas+Brazuca+2014+Official+Match+Ball, 60,4,SOLE E25 Elliptical,999.99,http://images.acmesports.sports/SOLE+E25+Elliptical, 61,4,Diamondback Girlsâ€™ Clarity 24 Hybrid Bike 201,299.99,http://images.acmesports.sports/Diamondback+Girlsâ€™+Clarity+24+Hybrid+Bike+2014, 62,4,Easton XL1 Youth Bat 2014 (-10),179.97,http://images.acmesports.sports/Easton+XL1+Youth+Bat+2014+(-10), 63,4,Fitness Gear 300 lb Olympic Weight Set,209.99,http://images.acmesports.sports/Fitness+Gear+300+lb+Olympic+Weight+Set, 64,4,Nike Womenâ€™s Pro Victory Compression Bra,21.99,http://images.acmesports.sports/Nike+Womenâ€™s+Pro+Victory+Compression+Bra, 65,4,Quik Shade Summit SX170 10 FT. x 10 FT. Canop,199.99,http://images.acmesports.sports/Quik+Shade+Summit+SX170+10+FT.+x+10+FT.+Canopy, 66,4,SOLE F85 Treadmill,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill, 67,4,Kijaro Dual Lock Chair,29.99,http://images.acmesports.sports/Kijaro+Dual+Lock+Chair, 68,4,Diamondback Adult Outlook Mountain Bike 2014,309.99,http://images.acmesports.sports/Diamondback+Adult+Outlook+Mountain+Bike+2014, 69,4,Easton S1 Youth Bat 2014 (-12),179.97,http://images.acmesports.sports/Easton+S1+Youth+Bat+2014+(-12), 70,4,Elevation Training Mask 2.0,79.99,http://images.acmesports.sports/Elevation+Training+Mask+2.0, 71,4,Diamondback Adult Response XE Mountain Bike 2,349.98,http://images.acmesports.sports/Diamondback+Adult+Response+XE+Mountain+Bike+2014, 72,4,Quest 12â€™ x 12â€™ Dome Canopy,149.99,http://images.acmesports.sports/Quest+12â€™+x+12â€™+Dome+Canopy))

def getTopSparseN(rec:(String,Iterable[String]),topN:Int):Iterable[String] ={
val sortedRDDs=rec._2.toList.sortBy(k=> -k.split(",")(4).toFloat)
return sortedRDDs.take(4)
}

val sortedRDD=productGroupby.flatMap(rec=>getTopN(rec,4))

60,4,SOLE E25 Elliptical,999.99,http://images.acmesports.sports/SOLE+E25+Elliptical

#Code

``````package training

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.log4j.Level
import org.apache.log4j.Logger
import com.typesafe.config.ConfigFactory

case class Orders(id: Int, date: String, customerID: Int, status: String)
case class OrderItems(id: Int, orderId: Int, productId: Int, quantity: Int, subtotal: Float, productPrice: Float)
case class Products(id: Int, categoryId: Int, name: String, description: String, price: Float, image: String)

object FromTraining {
def main(args: Array[String]): Unit = {

Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)

val master = appConfig.getConfig(args(2)).getString("deployment")

val conf = new SparkConf().setMaster(master).setAppName("Spark on Retail_db")
val sc = new SparkContext(conf)

val inputPath = args(0)
val outputPath = args(1)

val inputPathExists = fs.exists(new Path(inputPath))
val outputPathExists = fs.exists(new Path(outputPath))

if (!inputPathExists) {
println("Input Path does not exists")
return
}

if (outputPathExists) {
fs.delete(new Path(outputPath), true)
}

val orders = sc.textFile(inputPath + "/orders").flatMap { each =>
each.split(",") match {
case Array(id, date, customerId, status) => Some(Orders(id.toInt, date, customerId.toInt, status))
case _ => None
}
}
val orderItems = sc.textFile(inputPath + "/order_items").flatMap { each =>
each.split(",") match {
case Array(id, orderId, productId, quantity, subtotal, productPrice) => Some(OrderItems(id.toInt, orderId.toInt, productId.toInt, quantity.toInt, subtotal.toFloat, productPrice.toFloat))
case _ => None
}
}

val products = sc.textFile(inputPath + "/products").flatMap { each =>
each.split(",") match {
case Array(id, category_id, name, description, price, image) => Some(Products(id.toInt, category_id.toInt, name, description, price.toFloat, image))
case _ => None
}
}

//  Excersice 16

val completedOrders = orders.filter(eachOrder => eachOrder.status.contains("COMPLETE"))

val ordersToFilter = completedOrders.map(eachOrder => (eachOrder.id, eachOrder.date))

val orderItemsToFilter = orderItems.map(eachOrderItem => (eachOrderItem.orderId, eachOrderItem.subtotal))

val joinedOrders = orderItemsToFilter.join(ordersToFilter)

val afterJoin = joinedOrders.flatMap {
case (orderId, (subTotal, orderDate)) => Some(orderDate, subTotal)
}

val aggregatedResult = afterJoin.aggregateByKey((0.0, 0))((a, v) => (a._1 + v, a._2 + 1), (t1, t2) => (t1._1 + t2._1, t1._2 + t2._2))

val averageResultsForEachDate = aggregatedResult.map {
case (date, (totalrevenue, totalDays)) =>
(date, totalrevenue / totalDays)
}

val sortedResult = averageResultsForEachDate.sortByKey(ascending = false) //.take(5).foreach(println)

//   sortedResult.saveAsTextFile(outputPath)

// Excersize 17

val filteredProduct = products.filter(_.id != 685)

val mappedProducts = filteredProduct.map(product => (product.categoryId, product))

val groupedMappedProducts = mappedProducts.groupByKey()

val flattenedGroupMappedProducts = groupedMappedProducts.flatMap(each => getTopDense(each, 2))

flattenedGroupMappedProducts.saveAsTextFile(outputPath + "/excersize17")

}

def getTopDense(rec: (Int, Iterable[Products]), topN: Int): List[Products] = {
var productPrices: List[Float] = List()
var topNPrices: List[Float] = List()
var sortedRecords: List[Products] = List()

for (eachProduct <- rec._2) {
productPrices = productPrices :+ eachProduct.price
}

topNPrices = productPrices.distinct.sortBy(price => -price).take(topN)

var ranked: List[Products] = List()

sortedRecords = rec._2.toList.sortBy(product => product.price)
for (each <- sortedRecords) {
if (topNPrices.contains(each.price))
ranked = ranked :+ each
}
ranked
}

}
``````

#Sample output

``````Products(745,34,Ogio City Spiked Golf Shoes,,149.99,http://images.acmesports.sports/Ogio+City+Spiked+Golf+Shoes)
Products(746,34,Ogio City Spiked Golf Shoes,,149.99,http://images.acmesports.sports/Ogio+City+Spiked+Golf+Shoes)
Products(747,34,Ogio City Spiked Golf Shoes,,149.99,http://images.acmesports.sports/Ogio+City+Spiked+Golf+Shoes)
Products(743,34,Ogio Race Golf Shoes,,169.99,http://images.acmesports.sports/Ogio+Race+Golf+Shoes)
Products(744,34,Ogio Race Golf Shoes,,169.99,http://images.acmesports.sports/Ogio+Race+Golf+Shoes)``````
1 Like

Code Snippet:

val products = sc.textFile("/public/retail_db/products")

val filteredProducts = products.filter(!_.split(",")(0).equals(â€ś685â€ť))

val productsMap = filteredProducts.map(rec => (rec.split(",")(1), rec))

val productsGroupBy = productsMap.groupByKey()

val topPricedProducts = productsGroupBy.flatMap(rec => getTopDenseN(rec,5))

Funtion Logic:

``````   def getTopDenseN(rec:( String,Iterable[String] ), topN:Int): Iterable[String] ={
val sortedRecords = rec._2.toList.sortBy(k => -k.split(",")(4).toFloat )
return sortedRecords.take(5)
}
``````

Sample Output:

60,4,SOLE E25 Elliptical,999.99,http://images.acmesports.sports/SOLE+E25+Elliptical
58,4,Diamondback Boysâ€™ Insight 24 Performance Hybr,299.99,http://images.acmesports.sports/Diamondback+Boysâ€™+Insight+24+Performance+Hybrid+Bike+2014
162,8,YETI Tundra 65 Chest Cooler,399.99,http://images.acmesports.sports/YETI+Tundra+65+Chest+Cooler
153,8,Teeter Hang Ups NXT-S Inversion Table,299.99,http://images.acmesports.sports/Teeter+Hang+Ups+NXT-S+Inversion+Table
150,8,Quest 12â€™ x 12â€™ Dome Canopy,149.99,http://images.acmesports.sports/Quest+12â€™+x+12â€™+Dome+Canopy
155,8,E-Z Up Vista 12â€™ x 12â€™ Recreational Instant S,134.99,http://images.acmesports.sports/E-Z+Up+Vista+12â€™+x+12â€™+Recreational+Instant+Shelter
1279,57,PUMA Menâ€™s evoPOWER 1 Tricks FG Soccer Cleat,189.99,http://images.acmesports.sports/PUMA+Menâ€™s+evoPOWER+1+Tricks+FG+Soccer+Cleat
1297,57,PUMA Menâ€™s evoSPEED 1.2 Tricks FG Soccer Clea,174.99,http://images.acmesports.sports/PUMA+Menâ€™s+evoSPEED+1.2+Tricks+FG+Soccer+Cleat
450,20,Garmin Forerunner 220 GPS Watch,249.99,http://images.acmesports.sports/Garmin+Forerunner+220+GPS+Watch
452,20,Garmin vivofit Fitness Band with HRM,169.99,http://images.acmesports.sports/Garmin+vivofit+Fitness+Band+with+HRM
437,20,ASICS Womenâ€™s GEL-Noosa Tri 9 Running Shoe,139.99,http://images.acmesports.sports/ASICS+Womenâ€™s+GEL-Noosa+Tri+9+Running+Shoe
451,20,Garmin Womenâ€™s Forerunner 10 GPS Watch,129.99,http://images.acmesports.sports/Garmin+Womenâ€™s+Forerunner+10+GPS+Watch
439,20,ASICS Womenâ€™s GEL-Nimbus 15 Running Shoe,119.99,http://images.acmesports.sports/ASICS+Womenâ€™s+GEL-Nimbus+15+Running+Shoe

package org.test.spark

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

import com.typesafe.config.ConfigFactory

object GetTopN {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName(â€śspark exercise 17â€ť)
.setMaster(appConf.getConfig(args(2)).getString(â€śdeploymentâ€ť))
val sc=new SparkContext(conf)
val inputPath = args(0)
val outputPath = args(1)
val inputPathExists = fs.exists(new Path(inputPath))
val outputPathExists = fs.exists(new Path(outputPath))
if (!inputPathExists) {
println(â€śInput Path does not existsâ€ť)
return
}

``````if (outputPathExists) {
fs.delete(new Path(outputPath), true)
}
val products = sc.textFile(inputPath + "/products")
val pfiltered=products.filter { x => !x.split(",")(0).equals("685") }
val productsMap = pfiltered.map(rec => (rec.split(",")(1), rec))
val productsGroupBy = productsMap.groupByKey()

def getTopValues(rec:(String,Iterable[String]),topN:Int):Iterable[String] ={
val psort=rec._2.toList.sortBy(k=> -k.split(",")(4).toFloat)
return psort
}
val sort=productsGroupBy.flatMap(rec=>getTopValues(rec,4))
sort.saveAsTextFile(outputPath)
``````

sc.stop()
}
}

def getTopDenseN(rec: (Int, Iterable[String]), topN: Int): Iterable[String] = {
var prodPrices: List[Float] = List()
var topNPrices: List[Float] = List()
var sortedRecs: List[String] = List()
for (i <- rec._2) {
prodPrices = prodPrices :+ i.split(",")(4).toFloat
}
topNPrices = prodPrices.distinct.sortBy(k => -k).take(topN)
sortedRecs = rec._2.toList.sortBy(k => -k.split(",")(4).toFloat)
var x: List[String] = List()
for (i <- sortedRecs) {
if (topNPrices.contains(i.split(",")(4).toFloat))
x = x :+ i
}
return x

}

val products = sc.textFile(inputPath + â€ś/productsâ€ť)
val filteredProducts = products.filter(rec => (!rec.split(",")(4).isEmpty))
val productsMap = filteredProducts.map(rec => ((rec.split(",")(1).toInt), rec))
val productsGroupByKey = productsMap.groupByKey()
val sortedFlatMap = productsGroupByKey.flatMap(rec => (getTopDenseN(rec, 10)))

743,34,Ogio Race Golf Shoes,169.99,http://images.acmesports.sports/Ogio+Race+Golf+Shoes
744,34,Ogio Race Golf Shoes,169.99,http://images.acmesports.sports/Ogio+Race+Golf+Shoes
745,34,Ogio City Spiked Golf Shoes,149.99,http://images.acmesports.sports/Ogio+City+Spiked+Golf+Shoes
746,34,Ogio City Spiked Golf Shoes,149.99,http://images.acmesports.sports/Ogio+City+Spiked+Golf+Shoes
747,34,Ogio City Spiked Golf Shoes,149.99,http://images.acmesports.sports/Ogio+City+Spiked+Golf+Shoes
759,34,Nike Lunarwaverly Golf Shoes,139.99,http://images.acmesports.sports/Nike+Lunarwaverly+Golf+Shoes
760,34,Nike Lunarwaverly Golf Shoes,139.99,http://images.acmesports.sports/Nike+Lunarwaverly+Golf+Shoes
761,34,Nike Lunarwaverly Golf Shoes,139.99,http://images.acmesports.sports/Nike+Lunarwaverly+Golf+Shoes
762,34,Nike Lunarwaverly Golf Shoes,139.99,http://images.acmesports.sports/Nike+Lunarwaverly+Golf+Shoes
748,34,Ogio City Turf Golf Shoes,129.99,http://images.acmesports.sports/Ogio+City+Turf+Golf+Shoes

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import com.typesafe.config.ConfigFactory

object SparkExercise17 {

def main(args: Array[String]) {
val conf = new SparkConf().setAppName(â€śSneh Exercise 17â€ť).setMaster(appConf.getConfig(args(2)).getString(â€śdeploymentâ€ť))
val sc = new SparkContext(conf)

``````val inputPath = args(0)
val outputPath = args(1)

val inputPathExists = fs.exists(new Path(inputPath))
val outputPathExists = fs.exists(new Path(outputPath))

if (!inputPathExists) {
println("Input Path does not exists")
return
}

if (outputPathExists) {
fs.delete(new Path(outputPath), true)
}

val products = sc.textFile(inputPath+"/products")

val productsFiltererd = products.filter(rec => (!rec.split(",")(4).isEmpty))
val productsMapped = productsFiltererd.map(rec => (rec.split(",")(1).toInt, rec))
val productsGrpByKey = productsMapped.groupByKey()
val productsByCatId = productsGrpByKey.flatMap(rec => getTopDenseN(rec, 10))
productsByCatId.saveAsTextFile(outputPath)

sc.stop()
``````

}

def getTopDenseN(rec: (Int, Iterable[String]), topN: Int): Iterable[String] = {
var prodPrices: List[Float] = List()
var topNPrices: List[Float] = List()
var sortedRecs: List[String] = List()
for (i <- rec._2) {
prodPrices = prodPrices :+ i.split(",")(4).toFloat
}
topNPrices = prodPrices.distinct.sortBy(k => -k).take(topN)
sortedRecs = rec._2.toList.sortBy(k => -k.split(",")(4).toFloat)
var x: List[String] = List()
for (i <- sortedRecs) {
if (topNPrices.contains(i.split(",")(4).toFloat))
x = x :+ i
}
return x
}

}

package com.scala.avgrvn

import org.apache.spark.SparkConf
import com.typesafe.config.ConfigFactory
import org.apache.spark.SparkContext

object TopN {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName(â€śTopNâ€ť);
val sc = new SparkContext(conf);
val product=sc.textFile("/user/saswat232/retail_db/products")
val prdct_fltr = product.filter(!_.split(",")(0).equals(â€ś685â€ť))
val prdct_Map = prdct_fltr.map(rec => (rec.split(",")(4).toDouble, rec))
val prdct_grp = prdct_Map.groupByKey()
val top_priced = prdct_grp.flatMap(rec => getTopPrc(rec, 6))
top_priced.saveAsTextFile("/user/saswat232/retail_db/TopN")
}
def getTopPrc(rec: (Double,Iterable[String] ), TopN: Int) : Iterable[String]=
{
var prodPrices: List[Float] = List()
var topNPrices: List[Float] = List()
var sortedRecs: List[String] = List()
for(i <- rec._2) {
prodPrices = prodPrices:+ i.split(",")(4).toFloat
}
topNPrices = prodPrices.distinct.sortBy(k => -k).take(TopN)
sortedRecs = rec._2.toList.sortBy(k => -k.split(",")(4).toFloat)
var x: List[String] = List()
for(i <- sortedRecs) {
if(topNPrices.contains(i.split(",")(4).toFloat))
x = x:+ i
}
return x
}

}

Result

796,36,Callaway Womenâ€™s X-Tech Golf Glove - White/Pi,12.99,http://images.acmesports.sports/Callaway+Womenâ€™s+X-Tech+Golf+Glove+-+White%2FPink
814,37,Callaway HX Practice Golf Balls - 9 Pack,12.99,http://images.acmesports.sports/Callaway+HX+Practice+Golf+Balls+-+9+Pack
216,10,Yakima DoubleDown Ace Hitch Mount 4-Bike Rack,189.0,http://images.acmesports.sports/Yakima+DoubleDown+Ace+Hitch+Mount+4-Bike+Rack
952,43,Yakima DoubleDown Ace Hitch Mount 4-Bike Rack,189.0,http://images.acmesports.sports/Yakima+DoubleDown+Ace+Hitch+Mount+4-Bike+Rack
516,24,Callaway X Hot Laser Rangefinder,229.99,http://images.acmesports.sports/Callaway+X+Hot+Laser+Rangefinder
545,25,Callaway X Hot Laser Rangefinder,229.99,http://images.acmesports.sports/Callaway+X+Hot+Laser+Rangefinder
599,41,Callaway X Hot Laser Rangefinder,229.99,http://images.acmesports.sports/Callaway+X+Hot+Laser+Rangefinder
1064,48,Pelican Trailblazer 100 Kayak,229.99,http://images.acmesports.sports/Pelican+Trailblazer+100+Kayak
1071,48,Pelican Apex 100 Kayak,229.99,http://images.acmesports.sports/Pelican+Apex+100+Kayak
1091,49,Pelican Trailblazer 100 Kayak,229.99,http://images.acmesports.sports/Pelican+Trailblazer+100+Kayak

Function logic

``````   def denseTopN(rec: (String, Iterable[String]), topN: Int): Iterable[String] = {
var prodPrices: List[Float] = List()
var topNPrices: List[Float] = List()
var sortedRecs: List[String] = List()
for (i <- rec._2) {
prodPrices = prodPrices :+ i.split(",")(4).toFloat
}
topNPrices = prodPrices.distinct.sortBy(k => -k).take(topN)
sortedRecs = rec._2.toList.sortBy(k => -k.split(",")(4).toFloat)
var x: List[String] = List()
for (i <- sortedRecs) {
if (topNPrices.contains(i.split(",")(4).toFloat))
x = x :+ i
}
return x
}
``````

Code snippet

``````val loadFile = sc.textFile(inputpath1.toString())
val filterFile = loadFile.filter(rec => rec.split(",")(0).toInt != 685)

val mapFile = filterFile.map(rec => (rec.split(",")(0), rec))

val groupFile = mapFile.groupByKey()

val topPricedProducts = groupFile.flatMap(rec => denseTopN(rec, topN))

topPricedProducts.saveAsTextFile(outputpath.toString())
``````

Sample output

``````273,13,Under Armour Kids' Mercenary Slide,,27.99,http://images.acmesports.sports/Under+Armour+Kids%27+Mercenary+Slide
253,12,Nike Women's Free 5.0+ Running Shoe,,99.99,http://images.acmesports.sports/Nike+Women%27s+Free+5.0%2B+Running+Shoe
1148,51,Team Golf Pittsburgh Steelers Fairway Stand B,,179.99,http://images.acmesports.sports/Team+Golf+Pittsburgh+Steelers+Fairway+Stand+Bag