Spawhen submitting job in cluster then control not going inside filter or foreach or foreachpartition method

apache-spark
#1

Hi
I am facing one issue when submitting spark job in cluster

I wrote one simple example in spark and creating one javaRDD on rdd we are calling foreach this foreach working good in Eclipse IDE but when submitting this job in cluster then loop not going inside filter or foreach or foreachpartition
Please help me

code is below.

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.SparkSession;

import scala.Tuple2;

public class WordCountTr implements Serializable{
public static Logger logger=Logger.getLogger(WordCountTr.class);
public void execute(String t,int a){
System.out.println("key “+t+” a ============================================= "+a);
logger.info("key “+t+” a ========================================== "+a);
}
public static void main(String[] args) {
// TODO Auto-generated method stub
JavaSparkContext sc=getSparkcontext();///user/jp/emp
JavaRDD data= sc.textFile(“hdfs://127.0.1.1:8020/user/jp/emp/”);
String first=data.first();
logger.info("data.first() ====================================== "+data.first());
System.out.println("data.first() ================================== "+data.first());
JavaRDD clean =data.filter(new Function<String, Boolean>() {

	@Override
	public Boolean call(String arg0) throws Exception {
		logger.info("filter  ========================================================= "+arg0);
		System.out.println("filter   ================================================="+arg0);
		return !arg0.equalsIgnoreCase(first);
	}
});
for(String s:clean.take(41)){
	logger.info("for    =================================================="+s);
	System.out.println("for   =================================================="+s);	
}
System.out.println("Rdd count "+clean.count());
if(!clean.isEmpty()){
	WordCountTr c=new WordCountTr();
	System.out.println("inside if============================================");
	logger.info("inside if===================================================");
	clean.foreachPartition(f->{
		System.out.println("inside foreachPartition===============================");	
		logger.info("inside foreachPartition==========================================");	
		int i=0;
		List l=new ArrayList<>();
	while(f.hasNext()){
	

	c.execute(f.next(), ++i);
	if(i==2)
		i=0;
	System.out.println("out foreachPartition=============================================");	
	logger.info("out foreachPartition=======================================");	
	}
});
}


for(String s:clean.take(81)){
	logger.info("for ss   =============================================="+s);
	System.out.println("for  ss =========================================="+s);	
}
}

public static JavaSparkContext getSparkcontext(){

SparkConf conf = new SparkConf().setIfMissing("spark.master", "local[*]")
        .setAppName("Word Count");
return new JavaSparkContext(conf);

}
}

Any help appreciated

Thanks in advance

0 Likes