Removing duplicate rows based on specific column

How do I remove duplicates from a spark RDD based on specific columns

566,26,adidas Men’s Germany Black/Red Away Match Soc,90.0,http://images.acmesports.sports/adidas+Men’s+Germany+Black%2FRed+Away+Match+Soccer+Jersey

569,26,adidas Men’s Germany Home Soccer Jersey,90.0,http://images.acmesports.sports/adidas+Men’s+Germany+Home+Soccer+Jersey

560,26,adidas Men’s 2014 MLS All-Star Game Replica B,85.0,http://images.acmesports.sports/adidas+Men’s+2014+MLS+All-Star+Game+Replica+Black+Jersey

565,26,adidas Youth Germany Black/Red Away Match Soc,70.0,http://images.acmesports.sports/adidas+Youth+Germany+Black%2FRed+Away+Match+Soccer+Jersey

549,26,Lotto Men’s Zhero Gravity V 700 TF Soccer Cle,59.99,http://images.acmesports.sports/Lotto+Men’s+Zhero+Gravity+V+700+TF+Soccer+Cleat

551,26,Lotto Men’s Zhero Gravity V 700 TF Soccer Cle,59.99,http://images.acmesports.sports/Lotto+Men’s+Zhero+Gravity+V+700+TF+Soccer+Cleat

552,26,Lotto Men’s Zhero Gravity V 700 TF Soccer Cle,59.99,http://images.acmesports.sports/Lotto+Men’s+Zhero+Gravity+V+700+TF+Soccer+Cleat

I want to remove row 6 and row 7 based on all columns other than the first.

Hi Apply filter with custom function like

def skipRows(input : String) = {
// here we have to write custom logic
// actually i have not found different values other than first column
}

then rdd.filter(skpRows)

That is the problem. I have to remove duplicates using all the columns except the first one. Anyway, thanks for the help. will try from my side.

Step 1 : Load data
step 2 : take 1 row
step 3 : filter data (which is not with row)
steo 4 : perform distinct.

Here you go:

test.map(n=>n.split(",")).map(n=>((n(1),n(2),n(3),n(4),n(5)),n(0))).foldByKey(“0”) ((acc,element)=> if(acc == element) acc else element).map(n=>(n._2,n._1._1,n._1._2,n._1._4,n._1._5))

I am new to spark/scala/java. so I am not sure if the code is efficient. It is working fine with sample data(I have added quotes to string data in the file) that you gave.

here is the logic:
=>change the format from (a,b,c,d,e) to ((b,c,d,e),a)
=> Now do foldByKey on the above data (it is nothing but GroupBy on (b,c,d,e) based on some dummy condition on unwanted column “a”)
=> Now again change the format back. ie from ((b,c,d,e),a) to (a,b,c,d,e)

package com.mycompany.opensource.spark.core

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}

/**

  • Created by email2dgk on 3/1/2017.
    */
    object RemoveDups {

def main(args: Array[String]) {

Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)

val conf = new SparkConf().setAppName("RemoveDups").setMaster("local[4]")
val sc = new SparkContext(conf)

val inputFile = "dups.txt"

val dataRdd = sc.textFile(inputFile).map{ x=> val z = x.split(",")
  ((z(1),z(2),z(3),z(4),z(5)),z(0).toInt)}

val out = dataRdd.reduceByKey{
  case(x,y) => if (x > y) y else x
}.map{case(x,y) => (y,x)}

out.foreach(println)
sc.stop()
}

}

I sorted out the problem. If anyone interested PFB the working code

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import com.typesafe.config.ConfigFactory

object TopNProductsByCategory {

var temp = “”

def skipRows(rec:(String)): Boolean = {
// here we have to write custom logic
// actually i have not found different values other than first column

val line  = rec.split(",")(2) + rec.split(",")(4)

if(line.equalsIgnoreCase(temp)){
  println("inside if")
  temp = line
  return false
}
else
  println("inside else")
  temp = line
  true

}

def getTopNProducts(rec: (Int, Iterable[String]), topN : Int): Iterable[String] = {
  
  val productList = rec._2.toList.sortBy(rec => rec.split(",")(4).toFloat)
  val uniqueProductList = productList.filter(rec => skipRows(rec))
  
  println("####################")
  productList.take(100).foreach(println)
  println("####################")
  
  println("--------------------")
  uniqueProductList.take(100).foreach(println)
  println("--------------------")
  
  val topNPrices = uniqueProductList.map(x => x.split(",")(4).toFloat).sortBy(x => -x).slice(0,topN)
  
  val topNPricedProducts = uniqueProductList.sortBy(x => -x.split(",")(4).toFloat).
  filter(x => topNPrices.contains(x.split(",")(4).toFloat))
  
  topNPricedProducts
  
}


def main(args:Array[String]){

  val topN = args(0).toInt
	val appConf = ConfigFactory.load()
	val conf = new SparkConf().setAppName("Top "+ topN + "priced products in category-- simulating dense rank").
			setMaster("local")
	val sc = new SparkContext(conf)
	
	val products = sc.textFile("/public/retail_db/products")
val productsFiltered = products.filter(rec => rec.split(",")(0).toInt != 685)
val productsMap = productsFiltered.map(rec => (rec.split(",")(1).toInt,rec))
val productsGBK = productsMap.groupByKey()

productsGBK.flatMap(rec=> getTopNProducts(rec, topN)).collect().foreach(println)




}

}