Search code examples
javaapache-storm

How to process different tuples in the same bolt (Storm)


I have 2 Storm Bolts that emit different Values (defined be the declareOutputFields)

1)

    declarer.declare(new Fields("id", "bool"));
    basicOutputCollector.emit(new Values(id, true));
    declarer.declare(new Fields("id", "string"));
    basicOutputCollector.emit(new Values(id, "somestring"));

And I want to have a Bolt that functions as a Logger, that logs something if either the boolean value arrives, or the string value arrives.

The (simplified) Topology looks like this:

    var topologyBuilder = new TopologyBuilder();
    topologyBuilder.setSpout("spout")
    topologyBuilder.setBolt(BoolBolt, new BoolBolt()).fieldsGrouping("spout", new Fields("id"));
    topologyBuilder.setBolt(StringBolt, new StringBolt()).fieldsGrouping("spout", new Fields("id"));
    topologyBuilder.setBolt("LoggerBolt", new LoggerBolt())
            .fieldsGrouping("BoolBolt", new Fields("id"));
            .fieldsGrouping("StringBolt", new Fields("id"));
    return topologyBuilder.createTopology();

But if I now try to access the fields in the LoggerBolt, sometimes, the value is a boolean, and sometimes the value is a string because both BoolBolt and StringBolt emit Tuples to the LoggerBolt.

How can I hanlde this?

E.g. I would like to print "BOOL" if the Tuple ("id", "bool") arrives at the LoggerBolt and i want to print "STRING" if the Tuple ("id", "string") arrives at the logger.

Is there any possibility to check from what Bold a Tuple was emitted? Or is there anything I can check what Fields the Tuple containes?

And just checking if the tuples value is a String or not is not what I look for, as in example everything could be emitted!!!

Thanks in advance


Solution

  • You can examine the tuple object to find out which component it came from. Try the following in your LoggerBolt:

    String from_bolt = tuple.getSourceComponent();
    if ("BoolBolt".equals(from_bolt)) {
        LOG.info("BOOL");
    } else if ("StringBolt".equals(from_bolt)) {
        LOG.info("STRING");
    }
    

    An alternative is to check, if the tuple contains a certain field, this is done with tuple.contains("field_name"). Both methods would work.