Exercise 17 - Get topN (Dense or Spark)

apache-spark
spark-shell
#1

Problem

  • Get products data set
  • Make sure product with id 685 is filtered out
  • Get topN priced products (either using sparse rank or dense rank)
  • Hint: Use groupByKey and explicit function

Please provide

  • Function logic
  • Code snippet
  • Sample output
0 Likes

#2

#Sparse function
def getTopSparseN(rec:(String,Iterable[String]),topN:Int):Iterable[String] ={
val sortedRDDs=rec._2.toList.sortBy(k=> -k.split(",")(4).toFloat)
return sortedRDDs.take(topN)
}

#Code Snippet
val products=sc.textFile("/user/nagellarajashyam/sqoop_import/products")

val filteredProducts=products.filter(rec=>rec.split(",")(4).length>0)

val groupedProducts=filteredProducts.map(rec=>(rec.split(",")(1),rec))

val groupedRDD=groupedProducts.groupByKey()

val sortedRDD=groupedRDD.flatMap(rec=>getTopSparseN(rec,4))

#Sample output
66,4,SOLE F85 Treadmill,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill
60,4,SOLE E25 Elliptical,999.99,http://images.acmesports.sports/SOLE+E25+Elliptical
71,4,Diamondback Adult Response XE Mountain Bike 2,349.98,http://images.acmesports.sports/Diamondback+Adult+Response+XE+Mountain+Bike+2014
68,4,Diamondback Adult Outlook Mountain Bike 2014,309.99,http://images.acmesports.sports/Diamondback+Adult+Outlook+Mountain+Bike+2014
162,8,YETI Tundra 65 Chest Cooler,399.99,http://images.acmesports.sports/YETI+Tundra+65+Chest+Cooler
153,8,Teeter Hang Ups NXT-S Inversion Table,299.99,http://images.acmesports.sports/Teeter+Hang+Ups+NXT-S+Inversion+Table
148,8,Quik Shade Summit SX170 10 FT. x 10 FT. Canop,199.99,http://images.acmesports.sports/Quik+Shade+Summit+SX170+10+FT.+x+10+FT.+Canopy
150,8,Quest 12’ x 12’ Dome Canopy,149.99,http://images.acmesports.sports/Quest+12’+x+12’+Dome+Canopy
1279,57,PUMA Men’s evoPOWER 1 Tricks FG Soccer Cleat,189.99,http://images.acmesports.sports/PUMA+Men’s+evoPOWER+1+Tricks+FG+Soccer+Cleat
1297,57,PUMA Men’s evoSPEED 1.2 Tricks FG Soccer Clea,174.99,http://images.acmesports.sports/PUMA+Men’s+evoSPEED+1.2+Tricks+FG+Soccer+Cleat
1281,57,adidas Brazuca 2014 Official Match Ball,159.99,http://images.acmesports.sports/adidas+Brazuca+2014+Official+Match+Ball
1289,57,adidas Brazuca Final Rio Official Match Ball,159.99,http://images.acmesports.sports/adidas+Brazuca+Final+Rio+Official+Match+Ball
450,20,Garmin Forerunner 220 GPS Watch,249.99,http://images.acmesports.sports/Garmin+Forerunner+220+GPS+Watch
452,20,Garmin vivofit Fitness Band with HRM,169.99,http://images.acmesports.sports/Garmin+vivofit+Fitness+Band+with+HRM
437,20,ASICS Women’s GEL-Noosa Tri 9 Running Shoe,139.99,http://images.acmesports.sports/ASICS+Women’s+GEL-Noosa+Tri+9+Running+Shoe
451,20,Garmin Women’s Forerunner 10 GPS Watch,129.99,http://images.acmesports.sports/Garmin+Women’s+Forerunner+10+GPS+Watch
407,19,PUMA Men’s evoPOWER 1 Tricks FG Soccer Cleat,189.99,http://images.acmesports.sports/PUMA+Men’s+evoPOWER+1+Tricks+FG+Soccer+Cleat
425,19,Nike Men’s Air Max 2014 Running Shoe,149.99,http://images.acmesports.sports/Nike+Men’s+Air+Max+2014+Running+Shoe
417,19,Under Armour Men’s Highlight MC Alter Ego Fla,139.99,http://images.acmesports.sports/Under+Armour+Men’s+Highlight+MC+Alter+Ego+Flash+Football…
424,19,Under Armour Men’s Highlight MC Alter Ego Hul,139.99,http://images.acmesports.sports/Under+Armour+Men’s+Highlight+MC+Alter+Ego+Hulk+Football…
320,15,Manduka PRO Yoga Mat,100.0,http://images.acmesports.sports/Manduka+PRO+Yoga+Mat
322,15,Nike Women’s Studio Wrap Three-Part Pack,81.99,http://images.acmesports.sports/Nike+Women’s+Studio+Wrap+Three-Part+Pack
325,15,lucy Women’s Hatha Capri Leggings,79.0,http://images.acmesports.sports/lucy+Women’s+Hatha+Capri+Leggings
316,15,Nike Women’s Legend Regular Capris 2.0,65.0,http://images.acmesports.sports/Nike+Women’s+Legend+Regular+Capris+2.0

0 Likes

#3

Function logic
def getTopDenseN(rec: (String, Iterable[String]), topN: Int): Iterable[String] = {
var prodPrices: List[Float] = List()
var topNPrices: List[Float] = List()
var sortedRecs: List[String] = List()
for(i <- rec._2) {
prodPrices = prodPrices:+ i.split(",")(4).toFloat
}
topNPrices = prodPrices.distinct.sortBy(k => -k).take(topN)
sortedRecs = rec._2.toList.sortBy(k => -k.split(",")(4).toFloat)
var x: List[String] = List()
for(i <- sortedRecs) {
if(topNPrices.contains(i.split(",")(4).toFloat))
x = x:+ i
}
return x
}

Code snippet:
val products = sc.textFile("/public/retail_db/products").filter(rec => !rec.split(",")(0).equals(685))

val productsMap = products.map(rec => (rec.split(",")(1), rec))
val denserank = productsMap.groupByKey().flatMap(x => getTopDenseN(x, 2))
denserank.take(10).foreach(println)
Sample output:
66,4,SOLE F85 Treadmill,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill
60,4,SOLE E25 Elliptical,999.99,http://images.acmesports.sports/SOLE+E25+Elliptical
162,8,YETI Tundra 65 Chest Cooler,399.99,http://images.acmesports.sports/YETI+Tundra+65+Chest+Cooler
153,8,Teeter Hang Ups NXT-S Inversion Table,299.99,http://images.acmesports.sports/Teeter+Hang+Ups+NXT-S+Inversion+Table
1279,57,PUMA Men’s evoPOWER 1 Tricks FG Soccer Cleat,189.99,http://images.acmesports.sports/PUMA+Men’s+evoPOWER+1+Tricks+FG+Soccer+Cleat
1297,57,PUMA Men’s evoSPEED 1.2 Tricks FG Soccer Clea,174.99,http://images.acmesports.sports/PUMA+Men’s+evoSPEED+1.2+Tricks+FG+Soccer+Cleat
450,20,Garmin Forerunner 220 GPS Watch,249.99,http://images.acmesports.sports/Garmin+Forerunner+220+GPS+Watch
452,20,Garmin vivofit Fitness Band with HRM,169.99,http://images.acmesports.sports/Garmin+vivofit+Fitness+Band+with+HRM
407,19,PUMA Men’s evoPOWER 1 Tricks FG Soccer Cleat,189.99,http://images.acmesports.sports/PUMA+Men’s+evoPOWER+1+Tricks+FG+Soccer+Cleat
425,19,Nike Men’s Air Max 2014 Running Shoe,149.99,http://images.acmesports.sports/Nike+Men’s+Air+Max+2014+Running+Shoe

0 Likes

#4

val productsrdd=sc.textFile("/public/retail_db/products")

val productid=productsrdd.filter(!_.split(",")(0).equals(“685”))
val productsMap= productsrdd.map(rec=>(rec.split(",")(1),rec))
(2,1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,59.98,http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy)
(2,2,2,Under Armour Men’s Highlight MC Football Clea,129.99,http://images.acmesports.sports/Under+Armour+Men’s+Highlight+MC+Football+Cleat)
(2,3,2,Under Armour Men’s Renegade D Mid Football Cl,89.99,http://images.acmesports.sports/Under+Armour+Men’s+Renegade+D+Mid+Football+Cleat)

val productGroupby=productsMap.groupByKey()
(4,CompactBuffer(49,4,Diamondback Adult Sorrento Mountain Bike 2014,299.98,http://images.acmesports.sports/Diamondback+Adult+Sorrento+Mountain+Bike+2014, 50,4,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,59.98,http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy, 51,4,MAC Sports Collapsible Wagon,69.99,http://images.acmesports.sports/MAC+Sports+Collapsible+Wagon, 52,4,Easton Mako Youth Bat 2014 (-11),249.97,http://images.acmesports.sports/Easton+Mako+Youth+Bat+2014+(-11), 53,4,adidas Brazuca 2014 Top Glider Soccer Ball,29.99,http://images.acmesports.sports/adidas+Brazuca+2014+Top+Glider+Soccer+Ball, 54,4,Nike+ Fuelband SE,99.0,http://images.acmesports.sports/Nike%2B+Fuelband+SE, 55,4,adidas Brazuca 2014 Top Repliqué Soccer Ball,39.99,http://images.acmesports.sports/adidas+Brazuca+2014+Top+Repliqué+Soccer+Ball, 56,4,Fitbit Flex Wireless Activity & Sleep Wristba,99.95,http://images.acmesports.sports/Fitbit+Flex+Wireless+Activity+%26+Sleep+Wristband, 57,4,“Nike Women’s Pro Core 3"” Compression Shorts",28.0,http://images.acmesports.sports/Nike+Women’s+Pro+Core+3"+Compression+Shorts, 58,4,Diamondback Boys’ Insight 24 Performance Hybr,299.99,http://images.acmesports.sports/Diamondback+Boys’+Insight+24+Performance+Hybrid+Bike+2014, 59,4,adidas Brazuca 2014 Official Match Ball,159.99,http://images.acmesports.sports/adidas+Brazuca+2014+Official+Match+Ball, 60,4,SOLE E25 Elliptical,999.99,http://images.acmesports.sports/SOLE+E25+Elliptical, 61,4,Diamondback Girls’ Clarity 24 Hybrid Bike 201,299.99,http://images.acmesports.sports/Diamondback+Girls’+Clarity+24+Hybrid+Bike+2014, 62,4,Easton XL1 Youth Bat 2014 (-10),179.97,http://images.acmesports.sports/Easton+XL1+Youth+Bat+2014+(-10), 63,4,Fitness Gear 300 lb Olympic Weight Set,209.99,http://images.acmesports.sports/Fitness+Gear+300+lb+Olympic+Weight+Set, 64,4,Nike Women’s Pro Victory Compression Bra,21.99,http://images.acmesports.sports/Nike+Women’s+Pro+Victory+Compression+Bra, 65,4,Quik Shade Summit SX170 10 FT. x 10 FT. Canop,199.99,http://images.acmesports.sports/Quik+Shade+Summit+SX170+10+FT.+x+10+FT.+Canopy, 66,4,SOLE F85 Treadmill,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill, 67,4,Kijaro Dual Lock Chair,29.99,http://images.acmesports.sports/Kijaro+Dual+Lock+Chair, 68,4,Diamondback Adult Outlook Mountain Bike 2014,309.99,http://images.acmesports.sports/Diamondback+Adult+Outlook+Mountain+Bike+2014, 69,4,Easton S1 Youth Bat 2014 (-12),179.97,http://images.acmesports.sports/Easton+S1+Youth+Bat+2014+(-12), 70,4,Elevation Training Mask 2.0,79.99,http://images.acmesports.sports/Elevation+Training+Mask+2.0, 71,4,Diamondback Adult Response XE Mountain Bike 2,349.98,http://images.acmesports.sports/Diamondback+Adult+Response+XE+Mountain+Bike+2014, 72,4,Quest 12’ x 12’ Dome Canopy,149.99,http://images.acmesports.sports/Quest+12’+x+12’+Dome+Canopy))

def getTopSparseN(rec:(String,Iterable[String]),topN:Int):Iterable[String] ={
val sortedRDDs=rec._2.toList.sortBy(k=> -k.split(",")(4).toFloat)
return sortedRDDs.take(4)
}

val sortedRDD=productGroupby.flatMap(rec=>getTopN(rec,4))

66,4,SOLE F85 Treadmill,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill
60,4,SOLE E25 Elliptical,999.99,http://images.acmesports.sports/SOLE+E25+Elliptical
71,4,Diamondback Adult Response XE Mountain Bike 2,349.98,http://images.acmesports.sports/Diamondback+Adult+Response+XE+Mountain+Bike+2014
68,4,Diamondback Adult Outlook Mountain Bike 2014,309.99,http://images.acmesports.sports/Diamondback+Adult+Outlook+Mountain+Bike+2014

0 Likes

#5

#Code

package training

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import com.typesafe.config.ConfigFactory

case class Orders(id: Int, date: String, customerID: Int, status: String)
case class OrderItems(id: Int, orderId: Int, productId: Int, quantity: Int, subtotal: Float, productPrice: Float)
case class Products(id: Int, categoryId: Int, name: String, description: String, price: Float, image: String)

object FromTraining {
  def main(args: Array[String]): Unit = {

    Logger.getLogger("org").setLevel(Level.OFF)
    Logger.getLogger("akka").setLevel(Level.OFF)

    val appConfig = ConfigFactory.load()

    val master = appConfig.getConfig(args(2)).getString("deployment")

    val conf = new SparkConf().setMaster(master).setAppName("Spark on Retail_db")
    val sc = new SparkContext(conf)

    val inputPath = args(0)
    val outputPath = args(1)

    val fs = FileSystem.get(sc.hadoopConfiguration)
    val inputPathExists = fs.exists(new Path(inputPath))
    val outputPathExists = fs.exists(new Path(outputPath))

    if (!inputPathExists) {
      println("Input Path does not exists")
      return
    }

    if (outputPathExists) {
      fs.delete(new Path(outputPath), true)
    }

    val orders = sc.textFile(inputPath + "/orders").flatMap { each =>
      each.split(",") match {
        case Array(id, date, customerId, status) => Some(Orders(id.toInt, date, customerId.toInt, status))
        case _ => None
      }
    }
    val orderItems = sc.textFile(inputPath + "/order_items").flatMap { each =>
      each.split(",") match {
        case Array(id, orderId, productId, quantity, subtotal, productPrice) => Some(OrderItems(id.toInt, orderId.toInt, productId.toInt, quantity.toInt, subtotal.toFloat, productPrice.toFloat))
        case _ => None
      }
    }

    val products = sc.textFile(inputPath + "/products").flatMap { each =>
      each.split(",") match {
        case Array(id, category_id, name, description, price, image) => Some(Products(id.toInt, category_id.toInt, name, description, price.toFloat, image))
        case _ => None
      }
    }

    //  Excersice 16

    val completedOrders = orders.filter(eachOrder => eachOrder.status.contains("COMPLETE"))

    val ordersToFilter = completedOrders.map(eachOrder => (eachOrder.id, eachOrder.date))

    val orderItemsToFilter = orderItems.map(eachOrderItem => (eachOrderItem.orderId, eachOrderItem.subtotal))

    val joinedOrders = orderItemsToFilter.join(ordersToFilter)

    val afterJoin = joinedOrders.flatMap {
      case (orderId, (subTotal, orderDate)) => Some(orderDate, subTotal)
    }

    val aggregatedResult = afterJoin.aggregateByKey((0.0, 0))((a, v) => (a._1 + v, a._2 + 1), (t1, t2) => (t1._1 + t2._1, t1._2 + t2._2))

    val averageResultsForEachDate = aggregatedResult.map {
      case (date, (totalrevenue, totalDays)) =>
        (date, totalrevenue / totalDays)
    }

    val sortedResult = averageResultsForEachDate.sortByKey(ascending = false) //.take(5).foreach(println)

    //   sortedResult.saveAsTextFile(outputPath)

    // Excersize 17

    val filteredProduct = products.filter(_.id != 685)

    val mappedProducts = filteredProduct.map(product => (product.categoryId, product))

    val groupedMappedProducts = mappedProducts.groupByKey()

    val flattenedGroupMappedProducts = groupedMappedProducts.flatMap(each => getTopDense(each, 2))

    flattenedGroupMappedProducts.saveAsTextFile(outputPath + "/excersize17")

  }

  def getTopDense(rec: (Int, Iterable[Products]), topN: Int): List[Products] = {
    var productPrices: List[Float] = List()
    var topNPrices: List[Float] = List()
    var sortedRecords: List[Products] = List()

    for (eachProduct <- rec._2) {
      productPrices = productPrices :+ eachProduct.price
    }

    topNPrices = productPrices.distinct.sortBy(price => -price).take(topN)

    var ranked: List[Products] = List()

    sortedRecords = rec._2.toList.sortBy(product => product.price)
    for (each <- sortedRecords) {
      if (topNPrices.contains(each.price))
        ranked = ranked :+ each
    }
    ranked
  }

}

#Sample output

Products(745,34,Ogio City Spiked Golf Shoes,,149.99,http://images.acmesports.sports/Ogio+City+Spiked+Golf+Shoes)
Products(746,34,Ogio City Spiked Golf Shoes,,149.99,http://images.acmesports.sports/Ogio+City+Spiked+Golf+Shoes)
Products(747,34,Ogio City Spiked Golf Shoes,,149.99,http://images.acmesports.sports/Ogio+City+Spiked+Golf+Shoes)
Products(754,34,TRUE linkswear Lyt Dry Golf Shoes,,149.99,http://images.acmesports.sports/TRUE+linkswear+Lyt+Dry+Golf+Shoes)
Products(755,34,TRUE linkswear Lyt Dry Golf Shoes,,149.99,http://images.acmesports.sports/TRUE+linkswear+Lyt+Dry+Golf+Shoes)
Products(756,34,TRUE linkswear Lyt Dry Golf Shoes,,149.99,http://images.acmesports.sports/TRUE+linkswear+Lyt+Dry+Golf+Shoes)
Products(743,34,Ogio Race Golf Shoes,,169.99,http://images.acmesports.sports/Ogio+Race+Golf+Shoes)
Products(744,34,Ogio Race Golf Shoes,,169.99,http://images.acmesports.sports/Ogio+Race+Golf+Shoes)
1 Like

#6

Code Snippet:

val products = sc.textFile("/public/retail_db/products")

val filteredProducts = products.filter(!_.split(",")(0).equals(“685”))

val productsMap = filteredProducts.map(rec => (rec.split(",")(1), rec))

val productsGroupBy = productsMap.groupByKey()

val topPricedProducts = productsGroupBy.flatMap(rec => getTopDenseN(rec,5))

Funtion Logic:

   def getTopDenseN(rec:( String,Iterable[String] ), topN:Int): Iterable[String] ={
           val sortedRecords = rec._2.toList.sortBy(k => -k.split(",")(4).toFloat )
           return sortedRecords.take(5)
    }

Sample Output:

66,4,SOLE F85 Treadmill,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill
60,4,SOLE E25 Elliptical,999.99,http://images.acmesports.sports/SOLE+E25+Elliptical
71,4,Diamondback Adult Response XE Mountain Bike 2,349.98,http://images.acmesports.sports/Diamondback+Adult+Response+XE+Mountain+Bike+2014
68,4,Diamondback Adult Outlook Mountain Bike 2014,309.99,http://images.acmesports.sports/Diamondback+Adult+Outlook+Mountain+Bike+2014
58,4,Diamondback Boys’ Insight 24 Performance Hybr,299.99,http://images.acmesports.sports/Diamondback+Boys’+Insight+24+Performance+Hybrid+Bike+2014
162,8,YETI Tundra 65 Chest Cooler,399.99,http://images.acmesports.sports/YETI+Tundra+65+Chest+Cooler
153,8,Teeter Hang Ups NXT-S Inversion Table,299.99,http://images.acmesports.sports/Teeter+Hang+Ups+NXT-S+Inversion+Table
148,8,Quik Shade Summit SX170 10 FT. x 10 FT. Canop,199.99,http://images.acmesports.sports/Quik+Shade+Summit+SX170+10+FT.+x+10+FT.+Canopy
150,8,Quest 12’ x 12’ Dome Canopy,149.99,http://images.acmesports.sports/Quest+12’+x+12’+Dome+Canopy
155,8,E-Z Up Vista 12’ x 12’ Recreational Instant S,134.99,http://images.acmesports.sports/E-Z+Up+Vista+12’+x+12’+Recreational+Instant+Shelter
1279,57,PUMA Men’s evoPOWER 1 Tricks FG Soccer Cleat,189.99,http://images.acmesports.sports/PUMA+Men’s+evoPOWER+1+Tricks+FG+Soccer+Cleat
1297,57,PUMA Men’s evoSPEED 1.2 Tricks FG Soccer Clea,174.99,http://images.acmesports.sports/PUMA+Men’s+evoSPEED+1.2+Tricks+FG+Soccer+Cleat
1281,57,adidas Brazuca 2014 Official Match Ball,159.99,http://images.acmesports.sports/adidas+Brazuca+2014+Official+Match+Ball
1289,57,adidas Brazuca Final Rio Official Match Ball,159.99,http://images.acmesports.sports/adidas+Brazuca+Final+Rio+Official+Match+Ball
1294,57,adidas Men’s Copa Mundial Soccer Cleat,134.99,http://images.acmesports.sports/adidas+Men’s+Copa+Mundial+Soccer+Cleat
450,20,Garmin Forerunner 220 GPS Watch,249.99,http://images.acmesports.sports/Garmin+Forerunner+220+GPS+Watch
452,20,Garmin vivofit Fitness Band with HRM,169.99,http://images.acmesports.sports/Garmin+vivofit+Fitness+Band+with+HRM
437,20,ASICS Women’s GEL-Noosa Tri 9 Running Shoe,139.99,http://images.acmesports.sports/ASICS+Women’s+GEL-Noosa+Tri+9+Running+Shoe
451,20,Garmin Women’s Forerunner 10 GPS Watch,129.99,http://images.acmesports.sports/Garmin+Women’s+Forerunner+10+GPS+Watch
439,20,ASICS Women’s GEL-Nimbus 15 Running Shoe,119.99,http://images.acmesports.sports/ASICS+Women’s+GEL-Nimbus+15+Running+Shoe

0 Likes

#7

package org.test.spark

import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

import com.typesafe.config.ConfigFactory

object GetTopN {
def main(args: Array[String]): Unit = {
val appConf=ConfigFactory.load()
val conf=new SparkConf().setAppName(“spark exercise 17”)
.setMaster(appConf.getConfig(args(2)).getString(“deployment”))
val sc=new SparkContext(conf)
val inputPath = args(0)
val outputPath = args(1)
val fs = FileSystem.get(sc.hadoopConfiguration)
val inputPathExists = fs.exists(new Path(inputPath))
val outputPathExists = fs.exists(new Path(outputPath))
if (!inputPathExists) {
println(“Input Path does not exists”)
return
}

if (outputPathExists) {
  fs.delete(new Path(outputPath), true)
}
val products = sc.textFile(inputPath + "/products")
val pfiltered=products.filter { x => !x.split(",")(0).equals("685") }
val productsMap = pfiltered.map(rec => (rec.split(",")(1), rec))
val productsGroupBy = productsMap.groupByKey()

def getTopValues(rec:(String,Iterable[String]),topN:Int):Iterable[String] ={
val psort=rec._2.toList.sortBy(k=> -k.split(",")(4).toFloat)
return psort
}
val sort=productsGroupBy.flatMap(rec=>getTopValues(rec,4))
sort.saveAsTextFile(outputPath)

sc.stop()
}
}

0 Likes

#8

def getTopDenseN(rec: (Int, Iterable[String]), topN: Int): Iterable[String] = {
var prodPrices: List[Float] = List()
var topNPrices: List[Float] = List()
var sortedRecs: List[String] = List()
for (i <- rec._2) {
prodPrices = prodPrices :+ i.split(",")(4).toFloat
}
topNPrices = prodPrices.distinct.sortBy(k => -k).take(topN)
sortedRecs = rec._2.toList.sortBy(k => -k.split(",")(4).toFloat)
var x: List[String] = List()
for (i <- sortedRecs) {
if (topNPrices.contains(i.split(",")(4).toFloat))
x = x :+ i
}
return x

}

val products = sc.textFile(inputPath + “/products”)
val filteredProducts = products.filter(rec => (!rec.split(",")(4).isEmpty))
val productsMap = filteredProducts.map(rec => ((rec.split(",")(1).toInt), rec))
val productsGroupByKey = productsMap.groupByKey()
val sortedFlatMap = productsGroupByKey.flatMap(rec => (getTopDenseN(rec, 10)))


743,34,Ogio Race Golf Shoes,169.99,http://images.acmesports.sports/Ogio+Race+Golf+Shoes
744,34,Ogio Race Golf Shoes,169.99,http://images.acmesports.sports/Ogio+Race+Golf+Shoes
745,34,Ogio City Spiked Golf Shoes,149.99,http://images.acmesports.sports/Ogio+City+Spiked+Golf+Shoes
746,34,Ogio City Spiked Golf Shoes,149.99,http://images.acmesports.sports/Ogio+City+Spiked+Golf+Shoes
747,34,Ogio City Spiked Golf Shoes,149.99,http://images.acmesports.sports/Ogio+City+Spiked+Golf+Shoes
754,34,TRUE linkswear Lyt Dry Golf Shoes,149.99,http://images.acmesports.sports/TRUE+linkswear+Lyt+Dry+Golf+Shoes
755,34,TRUE linkswear Lyt Dry Golf Shoes,149.99,http://images.acmesports.sports/TRUE+linkswear+Lyt+Dry+Golf+Shoes
756,34,TRUE linkswear Lyt Dry Golf Shoes,149.99,http://images.acmesports.sports/TRUE+linkswear+Lyt+Dry+Golf+Shoes
759,34,Nike Lunarwaverly Golf Shoes,139.99,http://images.acmesports.sports/Nike+Lunarwaverly+Golf+Shoes
760,34,Nike Lunarwaverly Golf Shoes,139.99,http://images.acmesports.sports/Nike+Lunarwaverly+Golf+Shoes
761,34,Nike Lunarwaverly Golf Shoes,139.99,http://images.acmesports.sports/Nike+Lunarwaverly+Golf+Shoes
762,34,Nike Lunarwaverly Golf Shoes,139.99,http://images.acmesports.sports/Nike+Lunarwaverly+Golf+Shoes
748,34,Ogio City Turf Golf Shoes,129.99,http://images.acmesports.sports/Ogio+City+Turf+Golf+Shoes

0 Likes

#9

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import com.typesafe.config.ConfigFactory
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

object SparkExercise17 {

def main(args: Array[String]) {
val appConf = ConfigFactory.load()
val conf = new SparkConf().setAppName(“Sneh Exercise 17”).setMaster(appConf.getConfig(args(2)).getString(“deployment”))
val sc = new SparkContext(conf)

val inputPath = args(0)
val outputPath = args(1)

val fs = FileSystem.get(sc.hadoopConfiguration)
val inputPathExists = fs.exists(new Path(inputPath))
val outputPathExists = fs.exists(new Path(outputPath))

if (!inputPathExists) {
  println("Input Path does not exists")
  return
}

if (outputPathExists) {
  fs.delete(new Path(outputPath), true)
}

val products = sc.textFile(inputPath+"/products")

val productsFiltererd = products.filter(rec => (!rec.split(",")(4).isEmpty))
val productsMapped = productsFiltererd.map(rec => (rec.split(",")(1).toInt, rec))
val productsGrpByKey = productsMapped.groupByKey()
val productsByCatId = productsGrpByKey.flatMap(rec => getTopDenseN(rec, 10))
productsByCatId.saveAsTextFile(outputPath)

sc.stop()

}

def getTopDenseN(rec: (Int, Iterable[String]), topN: Int): Iterable[String] = {
var prodPrices: List[Float] = List()
var topNPrices: List[Float] = List()
var sortedRecs: List[String] = List()
for (i <- rec._2) {
prodPrices = prodPrices :+ i.split(",")(4).toFloat
}
topNPrices = prodPrices.distinct.sortBy(k => -k).take(topN)
sortedRecs = rec._2.toList.sortBy(k => -k.split(",")(4).toFloat)
var x: List[String] = List()
for (i <- sortedRecs) {
if (topNPrices.contains(i.split(",")(4).toFloat))
x = x :+ i
}
return x
}

}

0 Likes

#10

package com.scala.avgrvn

import org.apache.spark.SparkConf
import com.typesafe.config.ConfigFactory
import org.apache.spark.SparkContext

object TopN {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName(“TopN”);
val sc = new SparkContext(conf);
val product=sc.textFile("/user/saswat232/retail_db/products")
val prdct_fltr = product.filter(!_.split(",")(0).equals(“685”))
val prdct_Map = prdct_fltr.map(rec => (rec.split(",")(4).toDouble, rec))
val prdct_grp = prdct_Map.groupByKey()
val top_priced = prdct_grp.flatMap(rec => getTopPrc(rec, 6))
top_priced.saveAsTextFile("/user/saswat232/retail_db/TopN")
}
def getTopPrc(rec: (Double,Iterable[String] ), TopN: Int) : Iterable[String]=
{
var prodPrices: List[Float] = List()
var topNPrices: List[Float] = List()
var sortedRecs: List[String] = List()
for(i <- rec._2) {
prodPrices = prodPrices:+ i.split(",")(4).toFloat
}
topNPrices = prodPrices.distinct.sortBy(k => -k).take(TopN)
sortedRecs = rec._2.toList.sortBy(k => -k.split(",")(4).toFloat)
var x: List[String] = List()
for(i <- sortedRecs) {
if(topNPrices.contains(i.split(",")(4).toFloat))
x = x:+ i
}
return x
}

}

Result

789,36,TaylorMade Men’s Burner LTD Golf Glove,12.99,http://images.acmesports.sports/TaylorMade+Men’s+Burner+LTD+Golf+Glove
796,36,Callaway Women’s X-Tech Golf Glove - White/Pi,12.99,http://images.acmesports.sports/Callaway+Women’s+X-Tech+Golf+Glove+-+White%2FPink
814,37,Callaway HX Practice Golf Balls - 9 Pack,12.99,http://images.acmesports.sports/Callaway+HX+Practice+Golf+Balls+-+9+Pack
216,10,Yakima DoubleDown Ace Hitch Mount 4-Bike Rack,189.0,http://images.acmesports.sports/Yakima+DoubleDown+Ace+Hitch+Mount+4-Bike+Rack
952,43,Yakima DoubleDown Ace Hitch Mount 4-Bike Rack,189.0,http://images.acmesports.sports/Yakima+DoubleDown+Ace+Hitch+Mount+4-Bike+Rack
516,24,Callaway X Hot Laser Rangefinder,229.99,http://images.acmesports.sports/Callaway+X+Hot+Laser+Rangefinder
545,25,Callaway X Hot Laser Rangefinder,229.99,http://images.acmesports.sports/Callaway+X+Hot+Laser+Rangefinder
599,41,Callaway X Hot Laser Rangefinder,229.99,http://images.acmesports.sports/Callaway+X+Hot+Laser+Rangefinder
1064,48,Pelican Trailblazer 100 Kayak,229.99,http://images.acmesports.sports/Pelican+Trailblazer+100+Kayak
1071,48,Pelican Apex 100 Kayak,229.99,http://images.acmesports.sports/Pelican+Apex+100+Kayak
1091,49,Pelican Trailblazer 100 Kayak,229.99,http://images.acmesports.sports/Pelican+Trailblazer+100+Kayak
296,38,Nike+ Sportwatch GPS Powered by TomTom with S,169.0,http://images.acmesports.sports/Nike%2B+Sportwatch+GPS+Pow

0 Likes

#11

Function logic

   def denseTopN(rec: (String, Iterable[String]), topN: Int): Iterable[String] = {
      var prodPrices: List[Float] = List()
      var topNPrices: List[Float] = List()
      var sortedRecs: List[String] = List()
      for (i <- rec._2) {
        prodPrices = prodPrices :+ i.split(",")(4).toFloat
      }
      topNPrices = prodPrices.distinct.sortBy(k => -k).take(topN)
      sortedRecs = rec._2.toList.sortBy(k => -k.split(",")(4).toFloat)
      var x: List[String] = List()
      for (i <- sortedRecs) {
        if (topNPrices.contains(i.split(",")(4).toFloat))
          x = x :+ i
      }
      return x
    }

Code snippet

val loadFile = sc.textFile(inputpath1.toString())
val filterFile = loadFile.filter(rec => rec.split(",")(0).toInt != 685)

val mapFile = filterFile.map(rec => (rec.split(",")(0), rec))

val groupFile = mapFile.groupByKey()

val topPricedProducts = groupFile.flatMap(rec => denseTopN(rec, topN))

topPricedProducts.saveAsTextFile(outputpath.toString())

Sample output

273,13,Under Armour Kids' Mercenary Slide,,27.99,http://images.acmesports.sports/Under+Armour+Kids%27+Mercenary+Slide
253,12,Nike Women's Free 5.0+ Running Shoe,,99.99,http://images.acmesports.sports/Nike+Women%27s+Free+5.0%2B+Running+Shoe
1148,51,Team Golf Pittsburgh Steelers Fairway Stand B,,179.99,http://images.acmesports.sports/Team+Golf+Pittsburgh+Steelers+Fairway+Stand+Bag
1119,50,Majestic Men's 2014 All-Star Game Yadier Moli,,130.0,http://images.acmesports.sports/Majestic+Men%27s+2014+All-Star+Game+Yadier+Molina+%234+National...
282,13,Under Armour Women's Ignite PIP VI Slide,,31.99,http://images.acmesports.sports/Under+Armour+Women%27s+Ignite+PIP+VI+Slide
82,5,Kijaro Dual Lock Chair,,29.99,http://images.acmesports.sports/Kijaro+Dual+Lock+Chair
0 Likes