(Also a couple of questions about timeouts and maxSpoutPending)
I see a lot of references in the Storm documentation about messages being fully processed. But how does my KafkaSpout know when a message is fully processed?
Hopefully it is cognizant of the way my bolts are connected so when the final bolt in my Stream acks a tuple, the spout knows when my message is processed?
Otherwise, I would imagine that after the timeout period expires, the ack-ed state of a message is checked, and it is considered processed if indicated by the acking/anchoring XORs. But I hope this is not the case?
I also have related questions about maxTuplesPending and timeout configuration.
If I set the maxTuplePending to 10k, then am I correct in thinking each spout instance will continue to emit tuples until that spout instance is tracking 10k tuples in flight, 10k tuples that have not been fully processed? And then a new tuple is emitted when a currently in flight message is fully processed?
And finally, does this relate to the timeout configuration? Do the spouts wait in any fashion for the configured timeout to occur before emitting new messages?Or does the timeout configuration only come into play if a message is stalled/slow in being processed, resulting in it getting failed due to timeout?
More succinctly (or hopefully more clearly), is there an effect to setting my timeout to 30 mins other than messages won't be failed unless they are ack-ed by the final Bolt within 30 mins? Or are there other impacts, such as the timeout configuration effecting the emission rate of the spouts?
Sorry for the long, rambling question(s). Thanks in advance for any response.
*Edit to clarify further
The reason this is a concern for me, is because my messages don't necessarily run through the entire Stream.
Say I have Bolts A, B, C, D. Most of the time messages will get passed from A->B->->D. But I have some messages that intentionally will stop on bolt A. A will ack them but not emit them (because of my business logic, in those cases I do want further processing of the messages).
So will my KafkaSpout know that the message which is ack-ed but not emitted from A is fully processed? As I would like another message to be emitted from the spout as soon as Bolt A is done with it, in this case.
Storm tracks the tuples throughout the whole topology via the anchoring mechanism that the UDF code must use. This anchoring results in so-called tuple-tree, were the root of the tree is the tuple emitted by the spout and all other nodes (that are connected in a tree structure) represent the emitted tuples from bolts that used input tuples as anchors (this is only a logical model and not implemented this way in Storm, though).
For example, a Spout emit a sentence tuple that is split by the first bolt in words, some word are filtered by the second bolt, and a word count is applied by the third bolt. Finally, a sink bolt writes the result into file. The tree would look like this:
"this is an example sentence" -+-> "this"
+-> "is"
+-> "an"
+-> "example" -> "example",1 -> "example",1
+-> "sentence" -> "sentence",1 -> "sentence",1
The initial sentence is emitted by spout, used as anchor by bolt1 for all tokens that are emitted, and gets acked by bolt1. Bolt2 filters out "this", "is" and "an" and just acks the three tuples. "example" and "sentence" are just forwarded, used as anchor for the output tuple and acked afterwards. Same happens in bolt2, and the final sink bolt just acks all incoming tuples.
Furthermore, Storm tracks all acks of all tuples, ie, from intermediate bolts as well as sink bolts. First, the spout sends the ID of the output tuple to an acker task. Each time a tuple is used as anchor, the acker also get a message with the anchor tuple ID and the output tuple ID (which in auto generated by Storm). The ackes from the bolt also go to the same acker task that XORs them. If all acks got received -- ie, for the spout and all recursively anchored output tuples -- (the XOR result will be zero), the acker send a message to the spout that the tuple is fully processed and the backcall to Spout.ack(MessageId)
happens (ie, the back call is done immediately when the tuple is fully processed). Furthermore, the ackers check regularly, if there is a tuple that is registered by the acker longer than the timeout. If this happens, the tuple ID is dropped by the acker and a message is sent to the spout that the tuple failed (resulting in a call to Spout.fail(MessageId)
).
Furthermore, the Spouts keep a count of all tuple in flight and stop calling Spout.nextTuple()
if this count exceeds maxTuplesPending
parameter. As far as I know, the parameter is applied in globally, ie, the local counts of each spout tasks are summed up and the global count is compared to the parameter (not sure how this in implemented in detail though).
So the timeout
parameter is independent from maxTuplesPending
.