Search code examples
javaapacheapache-stormbolt

apache storm - run jar in single node correctly but not in multi nodes


I am new to apache storm, I wrote the code that includes 1 spout and 2 bolt, when I am running this 3 parts on one worker, the code generates output correctly, but when I'm running code in three worker that 1 worker executes spout, another run bolt 1 and last one run bolt 2, the output won't be generated. specific situation: when I put bolt 1 and 2 in one worker, the output generated!

I have to say that the emit work successfully and there is no problem with emit variables.

In details: i created tree in hashmap structure in bolt 1, and I want to mine this tree in bolt 2. the id of objects that insert in tree in bolt 1 are like "MyTreeNode@e70014d5" and when I received this tuple (hashmap) in bolt 2, the id changed to something like this "MyTreeNode@z5542r12".

what is the main problem?

Is the problem because of the changing object id? if yes, could you please inform me how can I solve it?


Solution

  • Let's look at an example topology.

    Let's say your topology goes spout -> bolt1, and you're emitting MyObject instances from the spout.

    Let's say you've set up the topology to run in 1 worker.

    When a tuple (e.g. MyObject@1234) is emitted from the spout, Storm will check if the tuple needs to go to another worker. If not, it just passes the object reference along to bolt1. This is what you are seeing when you have only 1 worker. When MyObject@1234 needs to get transferred from the spout to the bolt, Storm just hands the bolt the MyObject@1234 reference.

    Now let's say you tell the topology to use 2 workers, and Storm decides to put the spout in worker 1 and the bolt in worker 2. Recall that each worker is a separate JVM process, so passing the object reference from worker 1 to worker 2 won't work.

    When the tuple is emitted from the spout, Storm will see that it is going to another worker, and serialize it using either Kryo or Java serialization depending on your configuration. This means that MyObject@1234 gets serialized. Storm hands the serialized form to worker 2, which deserializes it. When it is deserialized, it is very reasonably given a new memory address (e.g. MyObject@6789).

    This is not an issue if you design your bolts to assume that they are not running in the same JVM, which you should absolutely do. For example, if you want to transfer a MyObject from worker 1 to worker 2, you might make it Serializable, or you might register it with Kryo (see how at https://storm.apache.org/releases/2.0.0-SNAPSHOT/Serialization.html). You need to do this so Storm can put your spouts and bolts in separate JVMs without breaking your topology.

    When you are testing your topology, you should enable https://storm.apache.org/releases/1.2.2/javadocs/org/apache/storm/Config.html#TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE. This will cause Storm to serialize your tuples always, even if the tuple isn't being transferred between workers. This can help you catch issues with serialization before they make it into production.

    As an aside, you should always prefer Kryo serialization to Java serialization. Kryo serialization is much faster.