Search code examples
javaapache-storm

Send data from one bolt to another Apache Storm


Hi working with Apache Storm. I have multiple kafka topics and I want to parse all the messages using a single bolt (with parallelism to handle load).

I wanted to ask is it possible? Below is what I am trying

Collection<SpoutSpec<? extends BaseRichBolt>> spouts; // I take this as a method argument

TopologyBuilder topology = new TopologyBuilder();

    spouts.forEach(spec -> {
        topology.setSpout(spec.getName() + "Spout", new KafkaSpout(spec.getSpoutConfig()), spec.getParallelism());
        topology.setBolt("FileBeat-Bolt", new FileBeatMessageBolt(), spec.getParallelism()).shuffleGrouping(spec.getName() + "Spout");
        topology.setBolt("Message-Handling-Bolt", new MessageHandlingBolt(), spec.getParallelism()).shuffleGrouping("FileBeat-Bolt");
        topology.setBolt("Output-Kafka-Bolt", new ProcessedOutputHandler(), spec.getParallelism()).shuffleGrouping("Message-Handling-Bolt");
    });

My SpoutSpec Class

public class SpoutSpec<T extends BaseRichBolt> {

    private final String name;

    private final int parallelism;

    private final SpoutConfig spoutConfig;

    private final T handler;

}

But the messages aren't getting emitted to the other bolts from FileBeat-Bolt. Below is how I am emitting the data:

JsonNode jsonNode = objectMapper.readValue(input.getString(0), JsonNode.class);

String topic = jsonNode.get("@metadata").get("topic").getTextValue();

String message = jsonNode.get("message").getTextValue();

collector.emit("Message-Handling-Bolt", input, new Values(topic, message));

Solution

  • Your emit call is wrong. The first parameter is not a bolt name, but a stream name. Stream names are used in cases where you want to partition your messages from a bolt into multiple data streams. In your case, you don't want to split the stream.

    collector.emit("Message-Handling-Bolt", input, new Values(topic, message));
    

    will emit to a stream called "Message-Handling-Bolt", and you have nothing listening on that stream. Your "Message-Handling-Bolt" is listening on the default stream. Either drop the first parameter to emit, or change your bolt declaration to this:

    topology.setBolt("Message-Handling-Bolt", new MessageHandlingBolt(), spec.getParallelism()).shuffleGrouping("FileBeat-Bolt", "Message-Handling-Bolt");
    

    Edit: Responding to your comment: The easiest solution for you is to simply drop the first parameter in your emit call:

    collector.emit(input, new Values(topic, message));
    

    If for some reason you don't want to do that, and want to explicitly name the stream, you need to declare that your FileBeatMessageBolt will emit to the Message-Handling-Bolt stream. You do this as part of your declareOutputFields implementation:

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream("Message-Handling-Bolt", new Fields(...));
    }