Sort by Iterable value in a Tuple using scala

apache-spark
scala

#1

I have the following RDD
org.apache.spark.rdd.RDD[(Int, Iterable[Float])]

Sample values
How can this be sorted by items in the iterable?
Sample data down below
(13,CompactBuffer(79.99, 34.99, 44.99, 199.99, 34.99, 44.99, 27.99, 31.99, 27.99, 21.99, 27.99, 31.99, 34.99, 44.99, 69.99, 34.99, 99.99, 31.99, 27.99, 31.99, 34.99, 34.99, 44.99, 69.99))
(19,CompactBuffer(129.99, 99.0, 189.99, 99.95, 89.99, 99.99, 89.99, 134.99, 129.99, 0.0, 129.99, 99.99, 139.99, 89.99, 89.99, 129.99, 124.99, 129.99, 59.99, 139.99, 149.99, 129.99, 59.99, 139.99))
(34,CompactBuffer(59.99, 59.99, 169.99, 169.99, 149.99, 149.99, 149.99, 129.99, 129.99, 129.99, 99.99, 99.99, 99.99, 149.99, 149.99, 149.99, 99.99, 99.99, 139.99, 139.99, 139.99, 139.99, 34.99, 99.99))
(39,CompactBuffer(199.99, 179.99, 129.99, 129.99, 109.99, 109.99, 109.99, 34.99, 19.99, 19.99, 19.99, 34.99, 19.99, 19.99, 19.99, 34.99, 19.99, 19.99, 19.99, 139.99, 109.99, 109.99, 109.99, 109.99))
(52,CompactBuffer(170.0, 10.0, 170.0, 170.0, 170.0, 65.0, 130.0, 26.0, 26.0, 26.0, 26.0, 26.0, 26.0, 26.0, 26.0, 26.0, 26.0, 26.0, 26.0, 26.0, 26.0, 26.0, 26.0, 54.99))
(4,CompactBuffer(299.98, 59.98, 69.99, 249.97, 29.99, 99.0, 39.99, 99.95, 28.0, 299.99, 159.99, 999.99, 299.99, 179.97, 209.99, 21.99, 199.99, 1799.99, 29.99, 309.99, 179.97, 79.99, 349.98, 149.99))
(16,CompactBuffer(209.99, 124.99, 79.99, 149.99, 179.99, 149.99, 299.99, 34.99, 249.99, 349.99, 29.99, 29.99, 119.99, 44.99, 149.99, 34.99, 44.99, 124.99, 99.99, 99.99, 27.99, 64.99, 99.99, 31.99))
(55,CompactBuffer(85.0, 85.0, 20.0, 24.0, 20.0, 28.0, 28.0, 28.0, 28.0, 85.0, 22.0, 22.0, 20.0, 26.0, 18.0, 28.0, 32.0, 25.0, 72.0, 20.0, 9.99, 9.99, 9.99, 9.99))
(54,CompactBuffer(99.99, 63.99, 87.99, 129.99, 39.99, 51.99, 149.99, 55.99, 199.99, 99.99, 63.99, 54.99, 299.99, 79.99, 299.99, 51.99, 34.99, 49.99, 44.99, 34.99, 55.99, 69.99, 179.99, 63.99))
(29,CompactBuffer(79.99, 49.99, 149.99, 4.99, 199.99, 149.99, 39.99, 15.99, 169.99, 139.99, 139.99, 139.99, 139.99, 139.99, 99.99, 99.99, 99.99, 99.99, 30.0, 30.0, 30.0, 30.0, 99.99, 99.99))

I have tried sortBy but unable to get the sort on values in the second element

Any suggestions please


#2

@vm109:

Its very difficult to correlate your issue w/o previous steps. As far as I know, the below steps give little idea so solve your issue.
You said, sortBy is required on second element. You want sortBY ASCENDING or DESCENDING is also need to consider. If you want sortBy DESCENDING order use negative in map transformation with split method.

sortBy : This will not work in this case, becoz your data is in collection(or tuple) format , so first you have to takeout CompactBuffer String from each record. For, that you have to use map with dot notation and convert that into list to give provision to sort :

RDD.map(rec => (rec._1, rec._2.toList)
After this step, your first record should be like this:

(13,(79.99, 34.99, 44.99, 199.99, 34.99, 44.99, 27.99, 31.99, 27.99, 21.99, 27.99, 31.99, 34.99, 44.99, 69.99, 34.99, 99.99, 31.99, 27.99, 31.99, 34.99, 34.99, 44.99, 69.99))

  1. val testDataMap = testData.map(rec => ((rec.split(",")(1).toInt, rec.split(",")(4).toFloat),rec)

Considering testData is your RDD created before this step

  1. val testDataSorted = testDataMap.sortByKey() [this is step is information purpose]

After sorting completed, discard the KEY so you have to use map again

  1. val testDataSortedFINAL = testDataMap.sortByKey().map(rec => rec._2)

Please confirm if you can able to solve this issue or post all the step with minimal data, so I can able to write complete solution for you.
Thanks
Venkat


#3

@avr8082

Venkat ,First of all thanks for taking time to look at this
Sorry i couldn’t provide previous steps,As that was a result of an extract from DB

But the solution you provided only provides a list of values which are sorted by Key,the actual requirement is to sort the values which are in the list

val testDataMap = testData.map(rec => ((rec.split(",")(1).toInt, rec.split(",")(4).toFloat),rec)
what is ur intention on getting the 4th element here?

Let me try to rephrase my Question with an example

scala> val d : (Int,Iterable[Double])= ((13,Iterable(54.99,53.00,42,86.55)))
d: (Int, Iterable[Double]) = (13,List(54.99, 53.0, 42.0, 86.55))

scala> val e : (Int,Iterable[Double])= ((15,Iterable(54.99,53.00,42,86.55,100,45,3,123)))
e: (Int, Iterable[Double]) = (15,List(54.99, 53.0, 42.0, 86.55, 100.0, 45.0, 3.0, 123.0))

scala> val f = sc.parallelize(List(d,e))
f: org.apache.spark.rdd.RDD[(Int, Iterable[Double])] = ParallelCollectionRDD[263] at parallelize at <console>:32

scala> f.collect
res157: Array[(Int, Iterable[Double])] = Array((13,List(54.99, 53.0, 42.0, 86.55)), (15,List(54.99, 53.0, 42.0, 86.55, 100.0, 45.0, 3.0, 123.0)))

In this example
I am looking for code which would provide the following output when i hit f.collect

Array((13,List(42.0,53.0, 54.99, 86.55)), (15,List(3.0,42.0, 45.0, 53.0, 54.99, 86.55, 100.0, 123.0)))


#4

@vm109:

Can you please send me your github data link,I will work on this and let you know.
Thanks
Venkat


#5

Here is the Problem i was trying in cloudera VM

  1. Copy “retaildb.products” table to hdfs as text file
  2. Now sort the products data sorted by product price per category, use productcategoryid
    column to group by category

Accomplish using spark alone and not DF or SparkSQL


#6

@vm109 @avr8082

is this you are looking for?
val products = sc.textFile("/public/retail_db/products")
val productsMap = products.
filter(product => product.split(",")(4) != “”).
map(product => ((product.split(",")(1).toInt , product.split(",")(4).toFloat)))
val productsGBK = productsMap.groupByKey.sortByKey()
val SortedList= productsGBK.map(x => {
val sorting = x._2.toList.sortBy(o => o)
(x._1,sorting)
})
SortedList.collect.foreach(println)

output:

(54,List(34.99, 34.99, 39.99, 44.99, 49.99, 51.99, 51.99, 54.99, 55.99, 55.99, 63.99, 63.99, 63.99, 69.99, 79.99, 87.99, 99.99, 99.99, 129.99, 149.99, 179.99, 199.99, 299.99, 299.99))
(55,List(9.99, 9.99, 9.99, 9.99, 18.0, 20.0, 20.0, 20.0, 20.0, 22.0, 22.0, 24.0, 25.0, 26.0, 28.0, 28.0, 28.0, 28.0, 28.0, 32.0, 72.0, 85.0, 85.0, 85.0))
(56,List(9.99, 25.0, 28.0, 28.0, 29.99, 29.99, 30.0, 39.99, 50.0, 55.0, 70.0, 70.0, 70.0, 75.0, 90.0, 90.0, 90.0, 90.0, 90.0, 90.0, 90.0, 90.0, 159.99, 159.99))
(57,List(0.0, 9.99, 29.99, 29.99, 34.99, 39.99, 44.99, 59.97, 59.99, 59.99, 70.0, 75.0, 90.0, 90.0, 90.0, 90.0, 90.0, 99.99, 109.99, 134.99, 159.99, 159.99, 174.99, 189.99))
(58,List(22.0, 28.0, 28.0, 30.0, 30.0, 50.0, 50.0, 50.0, 50.0, 50.0, 60.0, 60.0, 60.0, 100.0, 100.0, 100.0, 100.0, 100.0, 100.0, 100.0, 115.0, 130.0, 194.0, 241.0))
(59,List(28.0, 30.0, 30.0, 30.0, 30.0, 32.0, 32.0, 34.0, 69.97, 70.0, 100.0, 100.0, 100.0, 100.0, 100.0, 100.0, 100.0, 100.0, 100.0, 100.0, 100.0, 100.0, 100.0, 100.0))

Please reply me if this is not the one.


#7

@raghu yes

Thank you

I Didn’t know it could be done like this x._2.toList.sortBy(o => o)


#8

I used the same code, but got the below error.

:33: error: recursive value sorting needs type