Search code examples
apache-kafkaapache-stormkafka-consumer-api

How does Storm KafkaSpout know all bolts are executed


Example my topology code like this:

builder.setSpout("spout", new KafkaSpout);
builder.setBolt("bolt1", new Bolt1).shuffleGrouping("spout");
builder.setBolt("bolt2", new Bolt2).shuffleGrouping("bolt1");
builder.setBolt("bolt3", new Bolt3).shuffleGrouping("bolt2");

When bolt1 emitted, the message will be auto acked. But when bolt2 or bolt3 has exception occured, this message can't be resend, How can I retrieve failed message?


Solution

  • Storm has the concept of tuple trees at the helm of it. Let me try to explain using your example provided in the question.

    When your spout calls the collector.emit method, the newly emitted tuple, let's call it tuple1, is added to tuple tree. This tuple reaches bolt1 as it has subscribed to it and will receive data emitted from the spout. Once it receives tuple1 as input in the execute method, after processing the input a new value is emitted as tuple2 which is added in the tuple tree after tuple1. Before exiting the execute method the tuple is acknowledged by calling collector.ack implicitly which tells storm that tuple1 has been processed please remove it from the tuple tree and now remains tuple2 which is passed on to bolt2 for processing.

    Now the question arises what happens if bolt1 is unable to acknowledge due to some reason. Storm will see that after a certain period of time, which is the topology timeout time (defaults to 30s), the tuple tree hasn't exhausted thus it will replay the tuple from the start and the same above process will follow.

    Hope i am able to explain what happens on failure. For more detail please read this or watch this