I'm working with Apache Storm with this topology:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("socketspout", new SocketSpout(IP_HOST,PORT));
builder.setBolt("filterone", new FilterOne()).shuffleGrouping("socketspout");
builder.setBolt("filtertwo", new FilterTwo()).shuffleGrouping("filterone");
The methods of the first bolt are (FilteOne), this class extends BaseRichBolt:
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("ID1","signal1"));
}
public void execute(Tuple input) {
int sig;
try {
sig=input.getInteger(1)*2;
System.out.println("Filter one.."+Integer.toString(sig));
collector.emit("ack1", new Values(input.getString(0), sig));
collector.ack(input);
} catch (Exception e) {
collector.fail(input);
}
}
The methods of the second bolt are (FilteTwo), this class extends BaseRichBolt too:
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
public void execute(Tuple input) {
int sig;
try {
sig=input.getInteger(1)+1;
System.out.println("Filter two.."+Integer.toString(sig));
collector.ack(input);
} catch (Exception e) {
collector.fail(input);
}
}
When execute the program mode localcluster I can see the first bolt emit the tuple but the second never receive the tuple......
The problem was solved modifying the filter one code from collector.emit("ack1", new Values(input.getString(0), sig));
to collector.emit( new Values(input.getString(0), sig));