Search code examples
multithreadingapache-kafkakafka-producer-apiapache-kafka-streams

Optimizing a Kafka Streams Application with Multiple Sub-Topologies


I'm running a Kafka Streams application with three sub-topologies. The stages of activity are roughly as follows:

  1. stream Topic A
  2. selectKey and repartition Topic A to Topic B
  3. stream Topic B
  4. foreach Topic B to Topic C Producer
  5. stream Topic C
  6. Topic C to Topic D

Topics A, B, and C are each materialized, which means that if each topic has 40 partitions, my maximum parallelism is 120.

At first I was running 5 streams applications with 8 threads a piece. With this set up I was experiencing inconsistent performance. It seems like some sub-topologies sharing the same thread were hungrier for CPU than others and after a while, I'd get this error: Member [client_id] in group [consumer_group] has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator). Everything would get rebalanced, which could lead to decreased performance until the next failure and rebalance.

My questions are as follows:

  1. How is it that multiple sub-topologies are able to be run on a single thread? A poll queue?
  2. How does each thread decide how to allocate compute resources to each of its sub-topologies?
  3. How do you optimize your thread to topic-partition ratio in such cases to avoid periodic consumer failures? e.g., will a 1:1 ratio ensure more consistent performance?
  4. If you use a 1:1 ratio, how do you ensure that every thread gets assigned its own topic-partition and some threads aren't left idle?

Solution

    1. The thread will poll() for all topics of different sub-topologies and check the records topic metadata to feed it into the correct task.

    2. Each sub-topology is treated the same, ie, available resources are evenly distributed if you wish.

    3. A 1:1 ratio is only useful if you have enough cores. I would recommend to monitor your CPU utilization. If it's too high (larger >80%) you should add more cores/threads.

    4. Kafka Streams handles this for you automatically.

    Couple of general comments:

    • you might consider to increase max.poll.interval.ms config to avoid that a consumer drops out of the group
    • you might consider to decrease max.poll.records to get less records per poll() call, and thus decrease the time between two consecutive calls to poll().
    • note, that max.poll.records does not imply increases network/broker communication -- if a single fetch request return more records than max.poll.records config, the data is just buffered within the consumer and the next poll() will be served from the buffered data avoiding a broker round trip