Exercise 6, What should be the right return type?

Exercise 06: Get top n traded stocks by volume each month from NYSE data with in a given year

What should be the right return type of given function.?. I got given below error.
Error:
:14: error: type mismatch;
found : Unit
required: (String, Iterable[(String, Long)])
}

def topNStocksByVolumeMonth(rec: (String, Iterable[(String, Long)]), topN: Int): (String, Iterable[(String, Long)]) =
{
val list = rec._2.toList
val topNVolume = list.map(r => r._2.toLong).sortBy(k => -k.toLong).distinct.take(topN)
val sortedList = list.sortBy(k => k._2.toLong).filter(r => topNVolume.contains(r._2.toLong)).
map(r => (rec._1, r))
//rec._1 = String , r = List((“Stock stiker”, volume))

}

If, I am returning wrong then how to return (String, Iterable[(String, Long)] this also.

please find the sample below,

val date_group = sum_total.groupByKey()

def getTopNcustomer(a: (Int, Iterable[(Int, Float)]), topn: Int) = {
val x = a._2.toList.sortBy(o => -o._2)
val high_total = x.map(t => t._2).distinct.take(topn).min
val final_customer = x.takeWhile(q => q._2.toFloat >=high_total)
final_customer
}

val filter_cus = date_group.flatMap(e => getTopNcustomer(e, 5).map(a => (e._1,a)))

Thnks… I got the idea.

Very nice practise question. I first did the problem steps using 2015 as the year, and n=5. This is what I did. Would very much appreciate any feedback. Thank you.

// returns a dataframe with the results
def topNTradedStocks(n: Int, year: String, inputFilesPath: String): org.apache.spark.sql.DataFrame = {
// [stockticker: string, year_month: int, volume: bigint]
val stockDataDF = sc.
textFile(inputFilesPath).
filter(rec => {
val arr = rec.split(",")
arr(1).substring(0,4).equals(year)
}).
map(rec => {
val arr = rec.split(",")
(arr(0), arr(1).substring(0,6).toInt, arr(6).toLong)
}).
toDF(“stockticker”, “year_month”, “volume”)

val resultDF = stockDataDF.
withColumn(
“dense_rnk”,
dense_rank().over(partitionBy($“year_month”).orderBy($“volume”.desc))
).
filter($“dense_rnk”.leq(5)).
sort($“year_month”.asc, $“dense_rnk”.desc)
resultDF
}

val resultDF = topNTradedStocks(5, “2015”, “/user/devfactor/nyse”)

The top N prices by category lesson in Durga sir’s videos uses RDDs. However, I have also noticed that execution time with dataframes is much better than RDD. And also, I see better performance with Hive query which is easier for those familiar with SQL. So I have become very familiar with dataframe operations, and am practising more with dataframe and hive query. About to take exam for the 2nd time. Any advice is very much appreciated. Thank you.

Taking the year as 2016 and n as 5 :

top5StockByVolume2016 =
nyse.withColumn(‘transactiondate’,to_timestamp(‘transactiondate’, ‘yyyyMMdd’)).
where(date_format(‘transactiondate’,‘yyyy’)==‘2016’).
withColumn(‘month’, date_format(‘transactiondate’,‘MM’)).
withColumn(‘rnk’, rank().over(Window.partitionBy(col(‘month’)).
orderBy(nyse.volume.desc()))).
where(col(‘rnk’)<=5).
orderBy(col(‘month’))