Search code examples
apache-kafkaapache-storm

Storm Duration based windows should continue execution if no input


I'm working on a Storm bolt that implements BaseStatefulWindowedBolt to calculate a windowed running sum of numbers coming in from Kafka.

I create the bolt with something like

builder.setBolt(sumBoltId, new SumBolt()).withWindow(Duration.seconds(10), 
Duration.seconds(1)).withMessageIdField("msgId")).shuffleGrouping(sourceBoltId, 
boltStreamId);

Which all works fine but I noticed that when the input from Kafka stops the sum (which I'm logging) slowly drops and then stops logging. I expect it to drop but then I'd like it to continue processing for those windows even if the window contains no tuples.

So if I stop the source of number I expect it to slowly drop for 10 seconds and then log that the sum is 0 every second.

Has anyone encountered this and found a solution or should I figure out another way to publish the windows? Thanks!


Solution

  • One possible solution is to use tick tuples in storm which are like a special periodic tuples and can be used to convey special meanings to your bolts. In your case you can set a tick tuple frequency of 10s and after you have summed up your Kafka messages detect these tuples periodically in your bolt code and continue summing.