Search code examples
apache-storm

Wait for submitToplogy to finish


I am reading the storm applied book. I found the following code snippet in the book

LocalCluster lc = new LocalCluster()
lc.submitTopology("GitHub-commit-count-topology"), config, topology);
Utils.sleep(TEN_MINUTES)
lc.killTopology("GitHub-commit-count-topology")
lc.shutdown()

So this code will submit the topology for execution wait for fixed 10 minutes and then kill the topology. But this is odd. How can I say. submitTopology wait for it to complete and completed. kill and shutdown.

Like in Akka Streams we get Future[Done] and we just wait on that future to complete. (rather than fixed 10 minutes).


Solution

  • You can do this with https://github.com/apache/storm/blob/master/storm-server/src/main/java/org/apache/storm/Testing.java#L376.

    The reason this isn't used in some cases is that it requires every spout in the topology to implement the CompletableSpout interface https://github.com/apache/storm/blob/4137328b75c06771f84414c3c2113e2d1c757c08/storm-client/src/jvm/org/apache/storm/testing/CompletableSpout.java.

    Most Storm spouts never reach a point where they're "done" (since it's a stream processing framework, not a batch processing framework), so there's no way to tell when the topology is finished. For example, if you're consuming messages from a Kafka topic, the producers may at any point add more messages to the topic, so how will the consumer determine it is finished consuming?

    CompletableSpout exists mostly to ease testing, because it's then possible for a spout to say whether it is done. The completeTopology method I linked can then use this extra feature to tell whether all spouts in the topology are "done", and can stop the topology after that.

    If the spout you're using in a test doesn't implement CompletableSpout (which most spouts don't), there's no way to tell when the topology is finished in general. In many cases you can still do better than the example you linked, e.g. if my topology is supposed to write 10 messages to a queue in the test, I can make the test end once 10 messages have been written to the queue.

    To relate to Akka streams, I'm not really familiar with them, but looking at the introductory documentation, you could consider CompletableSpouts to be similar to bounded Sources (eg. a Source(1 to 100)), while "normal" spouts are unbounded Sources (e.g. a Source.repeat(1)).