Kafka-twiter is getting exception

Hi,
In ran kafka twitter pgm but getting below null pointer exception after sometime successfully run.
Exception
Hashtag: WRC
Hashtag: Montecarlorally
Hashtag: magic
Hashtag: RallyeMonteCarlo
Hashtag: Seohyun
Hashtag: Magic
java.lang.NullPointerException: Inflater has been closed
at java.util.zip.Inflater.ensureOpen(Unknown Source)
at java.util.zip.Inflater.inflate(Unknown Source)
at java.util.zip.InflaterInputStream.read(Unknown Source)
at java.util.zip.GZIPInputStream.read(Unknown Source)
at sun.nio.cs.StreamDecoder.readBytes(Unknown Source)
at sun.nio.cs.StreamDecoder.implRead(Unknown Source)
at sun.nio.cs.StreamDecoder.read(Unknown Source)
at java.io.InputStreamReader.read(Unknown Source)
at java.io.BufferedReader.fill(Unknown Source)
at java.io.BufferedReader.readLine(Unknown Source)
at java.io.BufferedReader.readLine(Unknown Source)
at twitter4j.StatusStreamBase.handleNextElement(StatusStreamBase.java:85)
at twitter4j.StatusStreamImpl.next(StatusStreamImpl.java:57)
at twitter4j.TwitterStreamImpl$TwitterStreamConsumer.run(TwitterStreamImpl.java:478).

run cmd
java -classpath D:\Fiat_chrysler\Software\hadoop\kafka_2.11-0.9.0.0\libs*;D:\Fiat_chrysler\Software\hadoop\kafka_2.11-0.9.0.0\com\kakfa\test*; com.kakfa.test.KafkaTwitterProducer 0DlIClcF8TpbY0b4TLgyyyyyy SZrCm3l4IkVxtO61LQO3bLDzyLehvHfZ1ANlBi25poiuuyyyy 354075464-NypUsJA58oBGthrsy2gY7OyjlXj6mnvaId6yutre axFe7xfXp3XMpHT07f9fHiPKwG9fHerOJc9nqO7yhgrd my-first-topic magic.

what might be a issue. twitter version 3.0.3.

I modified all tokens value.

thanks
suresh

This error Im getting on producer.close();. Im not able to find the root cause. Any have I suppressed the error time being

can you share your kafka class?

package com.kakfa.test;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;

import twitter4j.;
import twitter4j.conf.
;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaTwitterProducer {
public static void main(String[] args) throws Exception {
final LinkedBlockingQueue queue = new LinkedBlockingQueue(1000);

  if(args.length < 5){
     System.out.println("Usage: KafkaTwitterProducer <twitter-consumer-key> <twitter-consumer-secret> <twitter-access-token> <twitter-access-token-secret>  <topic-name> <twitter-search-keywords>");
     return;
  }

  String consumerKey = args[0].toString();
  String consumerSecret = args[1].toString();
  String accessToken = args[2].toString();
  String accessTokenSecret = args[3].toString();
  String topicName = args[4].toString();
  String[] arguments = args.clone();
  String[] keyWords = Arrays.copyOfRange(arguments, 5, arguments.length);
  TwitterStream twitterStream = null;
  Producer<String, String> producer = null;

  try{
  ConfigurationBuilder cb = new ConfigurationBuilder();
  cb.setDebugEnabled(true)
     .setOAuthConsumerKey(consumerKey)
     .setOAuthConsumerSecret(consumerSecret)
     .setOAuthAccessToken(accessToken)
     .setOAuthAccessTokenSecret(accessTokenSecret);

  twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
  StatusListener listener = new StatusListener() {
     public void onStatus(Status status) {
        queue.offer(status);

        // System.out.println("@" + status.getUser().getScreenName()+ " - " + status.getText());
        // System.out.println("@" + status.getUser().getScreen-Name());

        /*for(URLEntity urle : status.getURLEntities()) {
           System.out.println(urle.getDisplayURL());
        }*/

        /*for(HashtagEntity hashtage : status.getHashtagEntities()) {
           System.out.println(hashtage.getText());
        }*/
     }

     public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
        // System.out.println("Got a status deletion notice id:"+ statusDeletionNotice.getStatusId());
     }

     public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
        // System.out.println("Got track limitation notice:" + num-berOfLimitedStatuses);
     }

     public void onScrubGeo(long userId, long upToStatusId) {
        // System.out.println("Got scrub_geo event userId:" + userId +"upToStatusId:" + upToStatusId);
     }

     public void onStallWarning(StallWarning warning) {
        // System.out.println("Got stall warning:" + warning);
     }

     public void onException(Exception ex) {
       // ex.printStackTrace();
     }
  };
  twitterStream.addListener(listener);

  FilterQuery query = new FilterQuery().track(keyWords);
  twitterStream.filter(query);

  Thread.sleep(5000);

  //Add Kafka producer config settings
  Properties props = new Properties();
  props.put("bootstrap.servers", "localhost:9092");
  props.put("acks", "all");
  props.put("retries", 0);
  props.put("batch.size", 16384);
  props.put("linger.ms", 1);
  props.put("buffer.memory", 33554432);

  props.put("key.serializer",
     "org.apache.kafka.common.serialization.StringSerializer");
  props.put("value.serializer",
     "org.apache.kafka.common.serialization.StringSerializer");

  producer = new KafkaProducer<String, String>(props);
  int i = 0;
  int j = 0;

  while(i < 100) {
     Status ret = queue.poll();

     if (ret == null) {
    	//System.out.println("ret val : "+ret);
        Thread.sleep(100);
        i++;
     }else {
        for(HashtagEntity hashtage : ret.getHashtagEntities()) {
        	//System.out.println("coming else");
           System.out.println("Hashtag: " + hashtage.getText());
           producer.send(new ProducerRecord<String, String>(
              topicName, Integer.toString(j++), hashtage.getText()));
        }
     }
  }
  }
  catch(Exception e){
	  e.printStackTrace();
  }
   finally{
	  try {
      System.out.println("Before closing connection");
      producer.flush();
      producer.close();**//got error here**
      Thread.sleep(5000);
      twitterStream.shutdown();
	  } catch(Exception e) {
		//  e.printStackTrace();
	  }
   }

}
}

still facing the same issue?

After commented out the line woks fine. last two weeks I didn’t touch that code. If I find the root cause I will post it here.