Find the following from the given products.csv (for those who are preparing for CCA175)

Products.csv contains
storeID,productCategory,Product Id,Product_Price
100,Diary,1001,5.00
100,Diary,1002,1.50
100,Diary,1003,2.50
100,Diary,1004,0.75
100,Diary,1005,6.50
100,Diary,1006,6.50
100,Diary,1007,20.25
100,Diary,1008,50.00
100,Diary,1009,10.00
100,Diary,1010,1.50
101,Diary,101,2.50
101,Diary,102,15.00
101,Diary,103,12.50
101,Diary,104,3.00
101,Diary,105,16.50
101,Diary,106,31.00
101,Diary,107,7.80
101,Diary,108,45.00
101,Diary,109,9.95
101,Diary,110,1.25
100,Meat,2001,15.00
100,Meat,2002,25.00
100,Meat,2003,25.00
100,Meat,2004,13.00
100,Meat,2005,9.00
100,Meat,2006,26.00
100,Meat,2007,12.00
100,Meat,2008,13.00
100,Meat,2009,75.00
100,Meat,2010,15.00
101,Meat,201,14.00
101,Meat,202,23.00
101,Meat,203,21.00
101,Meat,204,13.00
101,Meat,205,10.00
101,Meat,206,25.00
101,Meat,207,11.00
101,Meat,208,13.00
101,Meat,209,80.00
101,Meat,210,1.25

  1. Find the maximum / Minimum priced product
  2. Find the maximum / Minimum priced product at the category Name level
  3. Find the maximum / Minimum priced product at the category Name level and Store Level
  4. Find the top 5 priced products
  5. Find the Top 3 Priced Products based on Store Only
  6. Find the Top 3 Priced Products based on Store and Category Only
5 Likes

Good question offers good practice on Spark.

1 Like

Good basic scenarios from exam point of view. Can i have the complete dataset of products.csv.

I have manipulated this dataset. You can get similar one from cloudera QuickStart vm
MySQL retail_db.products table

Good questions to practice. Following are the answers which I could come across. You may try by yourself first.

  1. Find the maximum / Minimum priced product

Minimum priced product:

products.map(lambda rec: (float(rec.split(",")[3]), rec)).sortByKey().first()

Maximum priced product: --> reverse the elements so that most priced value would come first

products.map(lambda rec: (float(rec.split(",")[3]), rec)).sortByKey(False).first()

  1. Find the maximum / Minimum priced product at the category Name level

prodGroupBy = products.map(lambda rec: (rec.split(",")[1], rec)).groupByKey()

Maximum priced product by category name:

final = prodGroupBy.map(lambda rec: (rec[0], list(itertools.islice(list(sorted(rec[1], key=lambda k: float(k.split(",")[3]), reverse=True)),0,1)))).collect()

Output:
(u’Diary’, [u’100,Diary,1008,50.00’])
(u’Meat’, [u’101,Meat,209,80.00’])

Minimum priced product by category name:

final = prodGroupBy.map(lambda rec: (rec[0], list(itertools.islice(list(sorted(rec[1], key=lambda k: float(k.split(",")[3]), reverse=False)),0,1)))).collect()

Output:
(u’Diary’, [u’100,Diary,1004,0.75’])
(u’Meat’, [u’101,Meat,210,1.25’])

  1. Find the maximum / Minimum priced product at the category Name level and Store Level

prodGroupBy = products.map(lambda rec: (rec.split(",")[0] +","+ rec.split(",")[1], rec)).groupByKey()

Maximum priced product by category name and store:

final = prodGroupBy.map(lambda rec: (rec[0], list(itertools.islice(list(sorted(rec[1], key=lambda k: float(k.split(",")[3]), reverse=True)),0,1))))
for a in final.collect(): print(a)

Output:
(u’101,Diary’, [u’101,Diary,108,45.00’])
(u’100,Meat’, [u’100,Meat,2009,75.00’])
(u’100,Diary’, [u’100,Diary,1008,50.00’])
(u’101,Meat’, [u’101,Meat,209,80.00’])

Minimum priced product by category name and store:

final = prodGroupBy.map(lambda rec: (rec[0], list(itertools.islice(list(sorted(rec[1], key=lambda k: float(k.split(",")[3]), reverse=False)),0,1))))
for a in final.collect(): print(a)

Output:
(u’101,Diary’, [u’101,Diary,110,1.25’])
(u’100,Meat’, [u’100,Meat,2005,9.00’])
(u’100,Diary’, [u’100,Diary,1004,0.75’])
(u’101,Meat’, [u’101,Meat,210,1.25’])

  1. Find the top 5 priced products

for a in products.map(lambda rec: (float(rec.split(",")[3]), rec)).sortByKey(False).map(lambda rec: rec[1]).take(5): print(a)

Output:
101,Meat,209,80.00
100,Meat,2009,75.00
100,Diary,1008,50.00
101,Diary,108,45.00
101,Diary,106,31.00

  1. Find the Top 3 Priced Products based on Store Only

prodGroupBy = products.map(lambda rec: (rec.split(",")[0], rec)).groupByKey()

final = prodGroupBy.map(lambda rec: (rec[0], list(itertools.islice(list(sorted(rec[1], key=lambda k: float(k.split(",")[3]), reverse=True)),0,3))))

for a in final.collect(): print(a)

Output:
(u’100’, [u’100,Meat,2009,75.00’, u’100,Diary,1008,50.00’, u’100,Meat,2006,26.00’])
(u’101’, [u’101,Meat,209,80.00’, u’101,Diary,108,45.00’, u’101,Diary,106,31.00’])

  1. Find the Top 3 Priced Products based on Store and Category Only

prodGroupBy = products.map(lambda rec: (rec.split(",")[0] +","+ rec.split(",")[1], rec)).groupByKey()
final = prodGroupBy.map(lambda rec: (rec[0], list(itertools.islice(list(sorted(rec[1], key=lambda k: float(k.split(",")[3]), reverse=True)),0,3))))

for a in final.collect(): print(a)

Output:
(u’101,Diary’, [u’101,Diary,108,45.00’, u’101,Diary,106,31.00’, u’101,Diary,105,16.50’])
(u’100,Meat’, [u’100,Meat,2009,75.00’, u’100,Meat,2006,26.00’, u’100,Meat,2002,25.00’])
(u’100,Diary’, [u’100,Diary,1008,50.00’, u’100,Diary,1007,20.25’, u’100,Diary,1009,10.00’])
(u’101,Meat’, [u’101,Meat,209,80.00’, u’101,Meat,206,25.00’, u’101,Meat,202,23.00’])

1 Like

@prakash_chaudhary, based on the schema you can cook the data, with help of this website:
Mockaroo - Random Data Generator

2 Likes

@pranav, Appreciate the efforts. And I tried myself 1st :wink:

package com.mycompany.opensource.spark.core

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}

/**

  • Created by email2dgk on 2/11/2017.
    */
    object Top5Products {

def main(args: Array[String]) {

Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)

val conf = new SparkConf().setAppName("Top5Products").setMaster("local[4]")
val sc = new SparkContext(conf)

val inputFile = "sales.txt"

case class sales(storeId: Int, categoryName: String, productId: Int, productPrice: Double) {
  override def toString = s"$storeId,$categoryName,$productId,$productPrice"
}

val inputRdd = sc.textFile(inputFile).map { x => val z = x.split(",")
  new sales(z(0).toInt, z(1), z(2).toInt, z(3).toDouble)
}

//Maximum Price Product
val maxPriceProductRdd = inputRdd.reduce {
  case (x, y) => if (x.productPrice > y.productPrice) x else y
}
println("Max Price Product")
println(maxPriceProductRdd)

//Minimum Priced Product
val minPriceProductRdd = inputRdd.reduce {
  case (x, y) => if (x.productPrice > y.productPrice) y else x
}

println("Min Price Product")
println(minPriceProductRdd)

//Category level maximum Product

val categoryKvRdd = inputRdd.map(x => (x.categoryName, x))

val categoryMaxPriceRdd = categoryKvRdd.reduceByKey {
  case (x, y) => if (x.productPrice > y.productPrice) x else y
}

//Category level maximum Product
println("Category level maximum Product")
categoryMaxPriceRdd.foreach(x => println(x._2))

//Category level Minimum Product
val categoryMinPriceRdd = categoryKvRdd.reduceByKey {
  case (x, y) => if (x.productPrice > y.productPrice) y else x
}

//Category level Minimum Product
println("Category level Minimum Product")
categoryMinPriceRdd.foreach(x => println(x._2))

//Category and Store Level Maximum Priced Product

val storeNCategoryKvRdd = inputRdd.map {
  x => (x.storeId + "," + x.categoryName, x)
}

val storeNCategoryMaxPriceRdd = storeNCategoryKvRdd.reduceByKey {
  case (x, y) => if (x.productPrice > y.productPrice) x else y
}
println("Category and Store Level Maximum Priced Product")

storeNCategoryMaxPriceRdd.foreach(x => println(x._2))

val storeNCategoryMinPriceRdd = storeNCategoryKvRdd.reduceByKey {
  case (x, y) => if (x.productPrice > y.productPrice) y else x
}
println("Category and Store Level Minimum Priced Product")

storeNCategoryMinPriceRdd.foreach(x => println(x._2))

//top 5 Priced Products

val top5Rdd = inputRdd.map { x => (x.productPrice, x) }.sortByKey(false, 1).take(5)

println("top 5 Priced Products")
top5Rdd.foreach(x => println(x._2))

println("Top 3 Priced Products based on Store Only")
val storeKvRdd = inputRdd.map { x => (x.storeId, x) }
val storeIdGroupRdd = storeKvRdd.groupByKey()
storeIdGroupRdd.flatMap {
  _._2.toList.sortBy(k => -k.productPrice).take(3)
}.foreach(println)

println("Top 3 Priced Products based on Store and Category Only")

val storeCategoryKvRdd = inputRdd.map { x => (x.storeId + "," + x.categoryName, x) }
val storeCategoryGroupRdd = storeCategoryKvRdd.groupByKey()
storeCategoryGroupRdd.flatMap{
  _._2.toList.sortBy(k=> -k.productPrice).take(3)
}.foreach(println)

sc.stop()

}

}

products = sc.textFile("/user/cloudera/pyspark/products.txt")

Max priced Products

for i in products.takeOrdered(1, key = lambda rec: -float(rec.split(",")[3])): print(i)

101,Meat,209,80.00

Min priced product

for i in products.takeOrdered(1, key = lambda rec: float(rec.split(",")[3])): print(i)

100,Diary,1004,0.75

Find the maximum priced product at the category Name level

productsMap = products.map(lambda rec: (rec.split(",")[1], rec))
productsGroupBy = productsMap.groupByKey()
for i in productsGroupBy.flatMap(lambda rec: list(itertools.islice(sorted(rec[1], key = lambda k: -float(k.split(",")[3])),0,1))).collect(): print(i)

100,Diary,1008,50.00
101,Meat,209,80.00

Find the Minimum priced product at the category Name level

for i in productsGroupBy.flatMap(lambda rec: list(itertools.islice(sorted(rec[1], key = lambda k: float(k.split(",")[3])),0,1))).collect(): print(i)

100,Diary,1004,0.75
101,Meat,210,1.25

Find the maximum priced product at the category Name level and Store Level

productsMap = products.map(lambda rec: ((rec.split(",")[1],rec.split(",")[0]), rec))
for i in productsGroupBy.flatMap(lambda rec: list(itertools.islice(sorted(rec[1], key = lambda k: -float(k.split(",")[3])),0,1))).collect(): print(i)

100,Diary,1008,50.00
101,Meat,209,80.00

Find the Minimum priced product at the category Name level and Store Level

for i in productsGroupBy.flatMap(lambda rec: list(itertools.islice(sorted(rec[1], key = lambda k: float(k.split(",")[3])),0,1))): print(i)

100,Diary,1004,0.75
101,Meat,210,1.25

Find the top 5 priced products

for i in products.takeOrdered(5, key = lambda rec: -float(rec.split(",")[3])): print(i)

101,Meat,209,80.00
100,Meat,2009,75.00
100,Diary,1008,50.00
101,Diary,108,45.00
101,Diary,106,31.00

Find the Top 3 Priced Products based on Store Only

for i in productsGroupBy.flatMap(lambda rec: list(itertools.islice(sorted(rec[1], key = lambda k: -float(k.split(",")[3])),0,3))).collect(): print(i)
100,Meat,2009,75.00
100,Diary,1008,50.00
100,Meat,2006,26.00
101,Meat,209,80.00
101,Diary,108,45.00
101,Diary,106,31.00
#Find the Top 3 Priced Products based on Store and Category Only
productsMap = products.map(lambda rec: ((rec.split(",")[1],rec.split(",")[0]), rec))
productsGroupBy = productsMap.groupByKey()
for i in productsGroupBy.flatMap(lambda rec: list(itertools.islice(sorted(rec[1], key = lambda k: -float(k.split(",")[3])),0,3))).collect(): print(i)

100,Diary,1008,50.00
100,Diary,1007,20.25
100,Diary,1009,10.00
100,Meat,2009,75.00
100,Meat,2006,26.00
100,Meat,2002,25.00
101,Diary,108,45.00
101,Diary,106,31.00
101,Diary,105,16.50
101,Meat,209,80.00
101,Meat,206,25.00
101,Meat,202,23.00

[quote=“email2dgk, post:7, topic:2516”]

Hi,
You done sort the price per storedIn in assending order as below.
val storeIdGroupRdd = storeKvRdd.groupByKey()
storeIdGroupRdd.flatMap {
_._2.toList.sortBy(k => -k.productPrice).take(3)
}.foreach(println)

But I try do descending like below but not working

val storeIdGroupRdd = storeKvRdd.groupByKey()
storeIdGroupRdd.flatMap {
_._2.toList.sortBy(k => -k.productPrice, false).take(3)
}.foreach(println)

Please help to resolve.
Thanks
Suresh

Sorry Now only I noticed -k.productPrice is for descnding and k.productPrice ascending order. - symbol is important.

  • symbol will work only numeric. How to solve for string.

Hi @email2dgk, Good questions to practice. I have a query regarding “6) Find the Top 3 Priced Products based on Store and Category Only” .
Should we perform a dense sorting here? because “100, diary” is having the same price for different products_ids(1005 & 1006). Please suggest.