Search code examples
javaapache-kafkaapache-storm

processes all pre processed records again from Kafka when restart Storm


I'm reading data from Kafka consumer to Storm spout. But, when i restart Storm it also read previously processed records from Kafka. On restart I don't want to process previously processed records. Here is my code :

public class KafkaStormSample {
    public static void main(String[] args) throws Exception {

        SpoutConfig kafkaSpoutConfig = new SpoutConfig(hosts, topic, "/" + topic, UUID.randomUUID().toString());
        kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutConfig));
        builder.setBolt("word-spitter", new SplitBolt()).shuffleGrouping("kafka-spout");

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("KafkaStormSample", config, builder.createTopology());
    }
}

Solution

  • Along with a static UUID you can use StormSubmitter to submit topologies to run on the Storm cluster. More info here