Search code examples
apache-zookeeperapache-stormapache-storm-topology

Storm large window size causing executor to be killed by Nimbus


I have a java spring application that submits topologies to a storm (1.1.2) nimbus based on a DTO which creates the structure of the topology.

This is working great except for very large windows. I am testing it with several varying sliding and tumbling windows. None are giving me any issue besides a 24 hour sliding window which advances every 15 minutes. The topology will receive ~250 messages/s from Kafka and simply windows them using a simple timestamp extractor with a 3 second lag (much like all the other topologies I am testing).

I have played with the workers and memory allowances greatly to try and figure this out but my default configuration is 1 worker with a 2048mb heap size. I've also tried reducing the lag which had minimal effects.

I think that it's possible the window size is getting too large and the worker is running out of memory which delays the heartbeats or zookeeper connection check-in which in turn cause Nimbus to kill the worker.

What happens is every so often (~11 window advances) the Nimbus logs report that the Executor for that topology is "not alive" and the worker logs for that topology show either a KeeperException where the topology can't communicate with Zookeeper or a java.lang.ExceptionInInitializerError:null with a nest PrivelegedActionException.

When the topology is assigned a new worker, the aggregation I was doing is lost. I assume this is happening because the window is holding at least 250*60*15*11 (messagesPerSecond*secondsPerMinute*15mins*windowAdvancesBeforeCrash) messages which are around 84 bytes each. To complete the entire window it will end up being 250*60*15*97 messages (messagesPerSecond*secondsPerMinute*15mins*15minIncrementsIn24HoursPlusAnExpiredWindow). This is ~1.8gbs if my math is right so I feel like the worker memory should be covering the window or at least more than 11 window advances worth.

I could increase the memory slightly but not much. I could also decrease the amount of memory/worker and increase the number of workers/topology but I was wondering if there is something I'm missing? Could I just increase the amount of time the heartbeat for the worker is so that there is more time for the executor to check-in before being killed or would that be bad for some reason? If I changed the heartbeat if would be in the Config map for the topology. Thanks!


Solution

  • This was caused by the workers running out of memory. From looking at Storm code. it looks like Storm keeps around every message in a window as a Tuple (which is a fairly big object). With a high rate of messages and a 24 hour window, that's a lot of memory.

    I fixed this by using a preliminary bucketing bolt that would bucket all the tuples in an initial 1 minute window which reduced the load on the main window significantly because it was now receiving one tuple per minute. The bucketing window doesn't run out of memory since it only has one minute of tuples at a time in its window.