Search code examples
javasqlapache-stormthread-sleep

How to stop tuple processing in Storm and execute other code


I'm a newbie of Storm. I'm using it for an University project.

I created my topology, with a Spout linked to a MySql database, and two Bolts. The first bolt, linked to the spout, prepares and removes information not necessary of the tuples; the second, does a filtering of the tuples.

I'm working in local mode.

My question is: why after running topology, in my console I see output like the lines below?

38211 [Thread-14-movie-SPOUT] INFO  backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __tick, id: {}, [30]
67846 [Thread-10-__acker] INFO  backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
67846 [Thread-8-cleaning-genre-bolt] INFO  backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
67852 [Thread-10-__acker] INFO  backtype.storm.daemon.task - Emitting: __acker __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@3c270095> [#<DataPoint [__emit-count = {}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__receive = {read_pos=0, write_pos=1, capacity=1024, population=1}]> #<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__sendqueue = {read_pos=-1, write_pos=-1, capacity=1024, population=0}]> #<DataPoint [__execute-count = {}]>]]
67853 [Thread-8-cleaning-genre-bolt] INFO  backtype.storm.daemon.task - Emitting: cleaning-genre-bolt __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@38c3d111> [#<DataPoint [__emit-count = {default=1680}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__receive = {read_pos=1621, write_pos=1622, capacity=1024, population=1}]> #<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count = {default=1680}]> #<DataPoint [__execute-latency = {movie-SPOUT:default=0.15476190476190477}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__sendqueue = {read_pos=1680, write_pos=1680, capacity=1024, population=0}]> #<DataPoint [__execute-count = {movie-SPOUT:default=1680}]>]]
67854 [Thread-13-filtering-genre-BOLT] INFO  backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
67855 [Thread-13-filtering-genre-BOLT] INFO  backtype.storm.daemon.task - Emitting: filtering-genre-BOLT __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@6d5c75a9> [#<DataPoint [__emit-count = {}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__receive = {read_pos=1681, write_pos=1682, capacity=1024, population=1}]> #<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {cleaning-genre-bolt:default=0.08333333333333333}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__sendqueue = {read_pos=-1, write_pos=-1, capacity=1024, population=0}]> #<DataPoint [__execute-count = {cleaning-genre-bolt:default=1680}]>]]

I read that these lines after the last tuple processed are to be considered normal. Isn't it?

And how can I run other code after the submission of topology? For example, I want to print the results of my filtering done in the second bolt, saved in a HashMap. If I put my code after the line containing the submitTopology() method, the code is ran before the completion of the tuples.

The second and last question is: why in every example of Storm, I see in the Spout

"Thread.sleep(1000)"?

Maybe it's linked to my first question.

I hope my questions are clear. Thank you in advance!


Solution

  • I read that these lines after the last tuple processed are to be considered normal. Isn't it?

    Those are just INFO messages. So no need to worry about them.

    If I put my code after the line containing the submitTopology() method, the code is ran before the completion of the tuples.

    If you submit your topology, the topology gets executed in the background (ie, multi-threaded). This is required, as your topology runs "forever" (until you stop it explicitly -- or your Java application terminates, as you are running local mode).

    Running code "after your topology finished" does not align with Storm concepts, as Strom is a streaming system and there is "no end in processing" (input stream in infinite, thus processing runs forever). If you want to process a finite data set, you might want to consider a batch processing framework like Flink or Spark.

    Thus, if you want to make this work in Storm, you need to be able to determine when all data got processed. Thus, after topology submission, you block and wait explicitly after all data got processed.

    However, for your use case, why do you not just print your result from within the last bolt?

    About Thread.sleep() I am not sure what example you do refer to. No idea why anybody should put it in for production. Maybe it's there for demoing purpose to slow down processing artificially.