Search code examples
apache-kafkaapache-stormesperboltspout

How to send multiple (different) tuples from one KafkaSpout at once to the bolt?


I am a novice in Apache Storm.

I am trying to develop a real-time stream processing system using Apache Kafka, Storm and ESPER CEP engine.

For that, I am having one KafkaSpout that will emit streams to Bolts(which has my CEP queries) to filter the stream.

I have already created a topology and I am trying to run it on a local cluster

The problem is that the CEP query running in my bolts require batches of tuples to perform window operations on the streams. And in my topology, KafkaSpout is sending only one tuple at a time to Bolts for processing. So my CEP query is not working as expected.

I am using default KafkaSpout in Storm. Is there any way I can send multiple different tuples at once to the Bolts? Some tuning of configuration can do this or do I need to make my custom KafkaSpout for that?

Please help!!

My topology:

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("KafkaSpout", new KafkaSpout<>(KafkaSpoutConfig.builder("localhost:" + 9092, "weatherdata").setProp(ConsumerConfig.GROUP_ID_CONFIG, "weather-consumer-group").build()),4);

builder.setBolt("A", new FeatureSelectionBolt(), 2).globalGrouping("KafkaSpout");

builder.setBolt("B", new TrendDetectionBolt(), 2).shuffleGrouping("A")

I am using 2 Bolts and one spout.

My esper Query running in Bolt A is

select first(e), last(e) from weatherEvent.win:length(3) as e

Here I am trying to get the first and last event from the window of length three from the event stream. But I get same first and last event because KafkaSpout is sending only one tuple at a time.


Solution

  • The spout can't do it, but you can use either Storm's windowing support https://storm.apache.org/releases/2.0.0-SNAPSHOT/Windowing.html, or just write an aggregation bolt and put it between the spout and the rest of the topology.

    So your topology should be spout -> aggregator -> feature selection -> trend detection.

    I'd recommend you try out the built-in windowing support, but if you would rather write your own aggregation, your bolt really just needs to receive some number of tuples (e.g. 3), and emit a new tuple containing all the values.

    The aggregator bolt should do something like

    private List<Tuple> buffered;
    
    execute(Tuple input) {
      if (buffered.size != 2) {
        buffered.add(input)
        return
      }
      Tuple first = buffered.get(0)
      Tuple second = buffered.get(1)
      Values aggregate = new Values(first.getValues(), second.getValues(), input.getValues())
      List<Tuple> anchors = List.of(first, second, input)
      collector.emit(anchors, aggregate)
      collector.ack(first, second, input)
      buffered.clear()
    }
    

    This way you end up with one tuple containing the contents of the 3 input tuples.