Example my topology code like this:
builder.setSpout("spout", new KafkaSpout);
builder.setBolt("bolt1", new Bolt1).shuffleGrouping("spout");
builder.setBolt("bolt2", new Bolt2).shuffleGrouping("bolt1");
builder.setBolt("bolt3", new Bolt3).shuffleGrouping("bolt2");
When bolt1 emitted, the message will be auto acked. But when bolt2 or bolt3 has exception occured, this message can't be resend, How can I retrieve failed message?
Storm has the concept of tuple trees
at the helm of it. Let me try to explain using your example provided in the question.
When your spout
calls the collector.emit
method, the newly emitted tuple, let's call it tuple1
, is added to tuple tree
. This tuple reaches bolt1
as it has subscribed to it and will receive data emitted from the spout
. Once it receives tuple1
as input in the execute
method, after processing the input a new value is emitted as tuple2
which is added in the tuple tree after tuple1
. Before exiting the execute
method the tuple is acknowledged by calling collector.ack
implicitly which tells storm that tuple1
has been processed please remove it from the tuple tree and now remains tuple2
which is passed on to bolt2
for processing.
Now the question arises what happens if bolt1
is unable to acknowledge due to some reason. Storm will see that after a certain period of time, which is the topology timeout time (defaults to 30s), the tuple tree hasn't exhausted thus it will replay the tuple from the start and the same above process will follow.
Hope i am able to explain what happens on failure. For more detail please read this or watch this