Spark, Top 10 customers for each chain

Hi Friends,

I have retail transaction data which I had loaded into hdfs and processed using spark.
Data Structure:
Store chain, total amount spent by customer, customer ID

Objective: To find top ten customers for each store

I have already processed data where it is sorted with chain and amount spend in descending order.
How to I extract only 10 rows for each chain?
Output as now is given below:

((80.0, 5941.9299999999021), 92795637.0)
((80.0, 5826.7799999999061), 95508708.0)
((80.0, 5665.3499999999258), 86228042.0)
((80.0, 5639.7899999999172), 16961724.0)
((80.0, 5613.0699999999042), 91466094.0)
((80.0, 5587.7799999999243), 78057763.0)
((80.0, 5400.889999999933), 55319724.0)
((80.0, 4959.4799999999532), 57172147.0)
((80.0, 4863.1499999999369), 17041984.0)
((80.0, 4858.1399999999385), 80415084.0)
((80.0, 4667.1199999999271), 71298343.0)
((80.0, 4640.1099999999442), 97030643.0)
((80.0, 4485.4499999999389), 16412251.0)
((80.0, 4446.2299999999459), 68405887.0)
((80.0, 4350.2399999999616), 21303030.0)
((95.0, 4303.0299999999606), 96705509.0)
((95.0, 4162.3999999999569), 82793956.0)
((95.0, 4118.1799999999539), 12262064.0)
((95.0, 3954.2499999999673), 22221380.0)
((95.0, 3932.6699999999814), 76011413.0)
((95.0, 3706.8399999999692), 83845536.0)
((95.0, 3639.4399999999641), 48693550.0)
((95.0, 3587.7099999999728), 12332190.0)
((95.0, 3533.2999999999729), 92550110.0)
((95.0, 3493.6199999999822), 95268323.0)
((95.0, 3431.859999999971), 59600890.0)

Can anyone please advice?
I am using pyspark.

@Majid_Ansari
package com.mycompany.opensource.spark.core

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}

/**

  • Created by email2dgk on 2/11/2017.
    */
    object Top10Chains {

def main(args: Array[String]) {

Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)

val conf = new SparkConf().setAppName("Top5Products").setMaster("local[4]")
val sc = new SparkContext(conf)

case class customers(chain:String, total:String, customerId:String){
  override def toString = s"$chain,$total,$customerId"
}

val inputFile = "customerNStores.txt"
val inputRdd = sc.textFile(inputFile).map{
  x=> val z = x.split(",")
    new customers(z(0), z(1), z(2))
}

val inputKvRdd = inputRdd.map{x=> (x.chain, x)}

val inputGroupRdd = inputKvRdd.groupByKey()

inputGroupRdd.flatMap{
  _._2.toList.sortBy(k=> -k.total.toDouble).take(10)
}.foreach(println)

sc.stop

}
}

You can try this in the pyspark…

Can I ask a question?

Yes please post your question here