Dataframe spark sql - Top N priced Products


#1

Hi
I am trying to solve the top N priced products using DF and spark sql
I registred a df - table called products using filtered products data.
val rankedProducts = sqlContext.sql(“select ProductId, categoryId, Name, price, rank() over (partition by categoryId order by price desc)”+
" as rnk_revenue, dense_rank() over (partition by categoryId order by price desc) as dense_rnk_revenue from products where dense_rank() over (partition by categoryId order by price desc) < 10 order by categoryId, price desc ")
this query is erroring
scala> val rankedProducts = sqlContext.sql(“select ProductId, categoryId, Name, price, rank() over (partition by categoryId order by price desc)”+
| " as rnk_revenue, dense_rank() over (partition by categoryId order by price desc) as dense_rnk_revenue from products where dense_rnk_revenue < 10 order by categoryId, price desc ")
19/01/01 08:34:59 INFO ParseDriver: Parsing command: select ProductId, categoryId, Name, price, rank() over (partition by categoryId order by price desc) as rnk_revenue, dense_rank() over (partition by categoryId order by price desc) as dense_rnk_revenue from products where dense_rnk_revenue < 10 order by categoryId, price desc
19/01/01 08:34:59 INFO ParseDriver: Parse Completed
org.apache.spark.sql.AnalysisException: cannot resolve ‘dense_rnk_revenue’ given input columns: [ProductId, categoryId, Name, price]; line 1 pos 222
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:60)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:57)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:334)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:332)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:108)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:118)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
Please help me to accomplish Top 10 priced products using rank function in spark sql.


Spark Core API's using Scala- Top N Priced Products
#2

@zaheer_shaik Try below query using sparksql.

val topNPricedProducts = sqlContext.sql(“select * from (select product_id ,product_price , category_id , category_name , rank() over (partition by category_id order by product_price) price_rank , dense_rank() over (partition by category_id order by product_price) price_dense_rank from products join categories on product_category_id = category_id order by category_id,price_rank, price_dense_rank ) tmp where price_dense_rank <= 10”)