Why my sortBy desc is not working? Thanks


#1

//sort by product_id
dailyRevenuePerProductId.take(10).sortBy(_._1._2).foreach(println)
((2013-12-11,116),179.96)
((2013-08-17,235),34.99)
((2014-01-15,365),7558.7393)
((2014-07-11,565),210.0)
((2014-04-22,627),1359.6599)
((2014-07-12,728),260.0)
((2013-11-04,775),29.97)
((2013-09-24,792),14.99)
((2014-04-06,793),44.97)
((2014-07-14,957),4499.7)

Now I want to sort in descending, so I added “.false” in the sortBy:
dailyRevenuePerProductId.take(10).sortBy(_._2,false).foreach(println)

I got this error message:
:44: error: too many arguments for method sortBy: (f: (((String, Int), Float)) => B)(implicit ord: scala.math.Ordering[B])Array[((String, Int), Float)]
dailyRevenuePerProductId.take(10).sortBy(_._2,false).foreach(println)

is this caused by different Spark version? the false option is available according to this doc:
https://spark.apache.org/docs/2.0.1/api/java/org/apache/spark/rdd/RDD.html#sortBy(scala.Function1,%20boolean,%20int,%20scala.math.Ordering,%20scala.reflect.ClassTag)

If yes, what is the exam’s version?

I checked the doc for spark 1.6.2 which is the lab version:
https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/rdd/RDD.html#sortBy(scala.Function1,%20boolean,%20int,%20scala.math.Ordering,%20scala.reflect.ClassTag)

It seems the false option is also available.

Can some one help to answer?

I understand this can be achieved by using sortWith but just want to know why sortBy throws error to me.

Thank you very much.


#2

@paslechoix
You would have to make a change while giving the key to sortBy function. It is not able to able to map directly so try doing

dailyRevenuePerProductId.take(10).sortBy(rec => rec._1._2).foreach(println)

I tried doing a sort in descending order based on the revenue and it came out fine. Here is the complete code

val productData = sc.textFile("/user/varunu28/retail_db/products")
val ordersData = sc.textFile("/user/varunu28/retail_db/orders")
val order_itemsData = sc.textFile("/user/varunu28/retail_db/order_items")

val orderMap = ordersData.
                map(rec => (rec.split(",")(0).toInt, rec.split(",")(1).split(" ")(0)))

val order_itemsMap = order_itemsData.
                map(rec => (rec.split(",")(1).toInt, (rec.split(",")(2).toInt, rec.split(",")(4).toFloat)))

val productsMap = productData.
                map(rec => (rec.split(",")(0).toInt, rec))

val ordersJoin = orderMap.join(order_itemsMap)

val ordersJoinMap = ordersJoin.map(rec => ((rec._2._2._1), (rec._2._1, rec._2._2._2)))

val productsJoin = productsMap.join(ordersJoinMap)

val dailyRevenuePerProdId = productsJoin.
                                map(rec => ((rec._2._2._1, rec._1), rec._2._2._2)).
                                reduceByKey(_+_)

dailyRevenuePerProdId.sortBy(rec => rec._2, false)

#3

Thank you for your reply, but where am I missing in my script? I just refactored my script here:

val orders = sc.textFile("/public/retail_db/orders")
val orderItems = sc.textFile("/public/retail_db/order_items")
val ordersFiltered = orders.filter(order => (order.split(",")(3) == “CLOSED” || order.split(",")(3) == “COMPLETE”))

val ordersFiltered = orders.
filter(order => order.split(",")(3) == “COMPLETE” || order.split(",")(3) == “CLOSED”)

// Convert filtered orders to key value pair <orderId, orderDate>
val ordersMap = ordersFiltered.
map(order => (order.split(",")(0).toInt, order.split(",")(1)))
val orderItemsMap = orderItems.
map(oi => (oi.split(",")(1).toInt,(oi.split(",")(2).toInt, oi.split(",")(4).toFloat)))
val ordersJoin = ordersMap.join(orderItemsMap)
val ordersJoinMap = ordersJoin.map(rec => ((rec._2._1, rec._2._2._1), rec._2._2.2))
val dailyRevenuePerProductId = ordersJoinMap.
reduceByKey((revenue, order_item_subtotal) => revenue + order_item_subtotal)
dailyRevenuePerProductId.take(10).sortBy(
._2, false).foreach(println)

the last line failed with:
:44: error: too many arguments for method sortBy: (f: (((String, Int), Float)) => B)(implicit ord: scala.math.Ordering[B])Array[((String, Int), Float)]
dailyRevenuePerProductId.take(10).sortBy(_._2, false).foreach(println)

If I change it to:
dailyRevenuePerProductId.take(10).sortWith(_.2>._2).foreach(println)

Then it works as expected.

Thank you.


#4

@paslechoix
I precisely pointed out the solution in my reply before

Change your code from

dailyRevenuePerProductId.take(10).sortBy(._2, false).foreach(println)

To

dailyRevenuePerProductId.sortBy(rec => rec._2, false).take(10).foreach(println)

The main reason for your error is that you use a take operation before a sortBy and therefore it gets converted into an array which gives an error.

So a resolution for that can be to use takeOrdered Spark API or parallelize the Array first and then perform the sort

Here is the code

sc.parallelize(dailyRevenuePerProductId.take(10)).
     sortBy(_._2, false).
     collect().foreach(println)

#5

Thanks, the reason I want to take(10) is I can get discrete subset before it is sorted, e.g.

	dailyRevenuePerProductId.sortBy(_._1._2).take(10).foreach(println)
	((2014-07-24 00:00:00.0,19),124.99)
	((2013-12-30 00:00:00.0,19),124.99)
	((2014-01-16 00:00:00.0,19),124.99)

It really doesn’t show sorted result because all values for the _._1._2 are 124.99

But if I take(10) first:

dailyRevenuePerProductId.take(10).sortBy(_._1._2).foreach(println)

((2013-10-11 00:00:00.0,116),224.95)
((2014-07-15 00:00:00.0,135),110.0)
((2014-07-17 00:00:00.0,403),3379.7402)

which you can tell the script is working right away

It is also interesting to notice that there are two sorting methods available: sortBy and sortWith:

dailyRevenuePerProductId.take(10).sortWith(_._2 > _._2).foreach(println)

((2014-01-15,365),7558.7393)
((2014-07-14,957),4499.7)
((2014-04-22,627),1359.6599)