I'm reading data from Kafka consumer to Storm spout. But, when i restart Storm it also read previously processed records from Kafka. On restart I don't want to process previously processed records. Here is my code :
public class KafkaStormSample {
public static void main(String[] args) throws Exception {
SpoutConfig kafkaSpoutConfig = new SpoutConfig(hosts, topic, "/" + topic, UUID.randomUUID().toString());
kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutConfig));
builder.setBolt("word-spitter", new SplitBolt()).shuffleGrouping("kafka-spout");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("KafkaStormSample", config, builder.createTopology());
}
}
Along with a static UUID you can use StormSubmitter
to submit topologies to run on the Storm cluster. More info here