Search code examples
apache-kafkaapache-storm

Multiple storm topologies consuming from a single Kafka topic


The performance tuning documentation provided by Storm states for the absolute best performance scaling multiple parallel topologies can yield better performance than simply scaling workers.

I am try to benchmark this theory against scaling worker.

However, using version 1.2.1 the storm Kafka spout is not behaving as I would have expected across multiple different topologies.

Setting a common client.id and group.id for the kafka spout consumer across all topologies for a single topic, each topology still subscribes to all available partitions and duplicate tuples, with errors being thrown as already committed tuples are recommitted.

I am surprised by this behaviour as I assumed that the consumer API would support this fairly simple use case.

I would be really grateful if somebody would explain

  1. what's the implementation logic of this behaviour with the kafka spout?
  2. any way around this problem?

Solution

  • The default behavior for the spout is to assign all partitions for a topic to workers in the topology, using the KafkaConsumer.assign API. This is the behavior you are seeing. With this behavior, you shouldn't be sharing group ids between topologies.

    If you want finer control over which partitions are assigned to which workers or topologies, you can implement the TopicFilter interface, and pass it to your KafkaSpoutConfig. This should let you do what you want.

    Regarding running multiple topologies being faster, I'm assuming you're referring to this section from the docs: In multiworker mode, messages often cross worker process boundaries. For performance sensitive cases, if it is possible to configure a topology to run as many single-worker instances [...] it may yield significantly better throughput and latency. The objective here is to avoid sending messages between workers, and instead keep each partition's processing internal in one worker. If you want to avoid running many topologies, you could look at customizing the Storm scheduler to make it allocate e.g. one full copy of your pipeline in each worker. That way, if you use localOrShuffleGrouping, there will always be a local bolt to send to, so you don't have to go over the network to another worker.