Hi working with Apache Storm
. I have multiple kafka topics and I want to parse all the messages using a single bolt (with parallelism to handle load).
I wanted to ask is it possible? Below is what I am trying
Collection<SpoutSpec<? extends BaseRichBolt>> spouts; // I take this as a method argument
TopologyBuilder topology = new TopologyBuilder();
spouts.forEach(spec -> {
topology.setSpout(spec.getName() + "Spout", new KafkaSpout(spec.getSpoutConfig()), spec.getParallelism());
topology.setBolt("FileBeat-Bolt", new FileBeatMessageBolt(), spec.getParallelism()).shuffleGrouping(spec.getName() + "Spout");
topology.setBolt("Message-Handling-Bolt", new MessageHandlingBolt(), spec.getParallelism()).shuffleGrouping("FileBeat-Bolt");
topology.setBolt("Output-Kafka-Bolt", new ProcessedOutputHandler(), spec.getParallelism()).shuffleGrouping("Message-Handling-Bolt");
});
My SpoutSpec Class
public class SpoutSpec<T extends BaseRichBolt> {
private final String name;
private final int parallelism;
private final SpoutConfig spoutConfig;
private final T handler;
}
But the messages aren't getting emitted to the other bolts from FileBeat-Bolt
. Below is how I am emitting the data:
JsonNode jsonNode = objectMapper.readValue(input.getString(0), JsonNode.class);
String topic = jsonNode.get("@metadata").get("topic").getTextValue();
String message = jsonNode.get("message").getTextValue();
collector.emit("Message-Handling-Bolt", input, new Values(topic, message));
Your emit
call is wrong. The first parameter is not a bolt name, but a stream name. Stream names are used in cases where you want to partition your messages from a bolt into multiple data streams. In your case, you don't want to split the stream.
collector.emit("Message-Handling-Bolt", input, new Values(topic, message));
will emit to a stream called "Message-Handling-Bolt", and you have nothing listening on that stream. Your "Message-Handling-Bolt" is listening on the default stream. Either drop the first parameter to emit
, or change your bolt declaration to this:
topology.setBolt("Message-Handling-Bolt", new MessageHandlingBolt(), spec.getParallelism()).shuffleGrouping("FileBeat-Bolt", "Message-Handling-Bolt");
Edit: Responding to your comment: The easiest solution for you is to simply drop the first parameter in your emit call:
collector.emit(input, new Values(topic, message));
If for some reason you don't want to do that, and want to explicitly name the stream, you need to declare that your FileBeatMessageBolt
will emit to the Message-Handling-Bolt
stream. You do this as part of your declareOutputFields
implementation:
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("Message-Handling-Bolt", new Fields(...));
}