Search code examples
javatimecountwindowapache-storm

Tumbling window in Storm with both Count and Duration


How to create Tumbling window in Storm with has both the thresholds. For example, if I set WindowCount of 500 and WindowDuration as 5 Seconds, the window should get processed even if there are less than 500 messages but 5 Seconds have been elapsed. I could see independent APIs for both the functionalities

For Count

.tumblingWindow(1000, windowStoreFactory, new Fields("word"), new CountAsAggregator(), new Fields("count"))

For Time

.tumblingWindow(Duration.seconds(5), windowStoreFactory, new Fields("word"), new CountAsAggregator(), new Fields("count"))

Can I have combination of both ?

If I configured by MessageCount rather than Duration , what will happen to my messages when I stop the topology ? Will Storm process those messages even if the batch count is not received ? Or Will I be losing these messages ?


Solution

  • I don't believe you can do this with the current windowing API.

    The code is pluggable enough to allow for it internally, but the API you need isn't exposed. There are two interfaces to define how windows are processed.

    TriggerPolicy decides when to deliver windows to the bolt (e.g. "deliver when 100 tuples have been buffered").

    EvictionPolicy decides when to evict tuples from the current window (e.g. "discard tuples once they're more than 500 tuples behind the newest tuple in the window").

    You configure these policies indirectly via e.g. BaseWindowedBolt.withWindowLength, which internally just sets some configuration properties. Those properties are used to determine the trigger/eviction policy in WindowedBoltExecutor.

    I think what is needed is to either allow users to provide their own custom TriggerPolicy/EvictionPolicy, or alternately to add a new Trigger/EvictionPolicy that does what you want.

    If you want to raise an issue for this, you can do so at https://issues.apache.org/jira/projects/STORM. If you would like to contribute code, the source is available at https://github.com/apache/storm, where you can also raise a PR.