Search code examples
apache-kafka-streams

Consumer assignment with multiple topics with Kafka Streams


Apologies if this has been already covered before here, I couldn't find anything closely related. I have this Kafka Streams app which reads from multiple topics, persist the records on a DB and then publish an event to an output topic. Pretty straightforward, it's stateless in terms of kafka local stores. (Topology below)

Topic1(T1) has 5 partitions, Topic2(T2) has a single partition. The issue here is, while consuming from two topics, if I want to go "full speed" with T1 (5 consumers), it doesn't guarantee that I will have dedicated consumers for each partition on T1. It will be distributed within the two topic partitions and I might end up with unbalanced consumers (and idle consumers), something like below:

  • [c1: t1p1, t1p3], [c2: t1p2, t1p5], [c3: t1p4, t2p1], [c4: (idle consumer)], [c5: (idle consumer)]
  • [c1: t1p1, t1p2], [c2: t1p5], [c3: t1p4, t2p1], [c4: (idle consumer)], [c5: t1p3]

With that said:

  1. Is it a good practice having a topology that reads from multiple topics within the same KafkaStreams instance?

  2. Is there any way to achieve a partition assignment like the following if I want go "full speed" for T1? [c1: t1p1, t2p1], [c2: t1p2], [c3: t1p3], [c4: t1p4], [c5: t1p5]

  3. Which of the topologies below is most optimal to what I want to achieve? Or is it completely unrelated?

Option A (Current topology)

Topologies:
   Sub-topology: 0
    Source: topic1-source (topics: [TOPIC1])
      --> topic1-processor
    Processor: topic1-processor (stores: [])
      --> topic1-sink
      <-- topic1-source
    Sink: topic1-sink (topic: OUTPUT-TOPIC)
      <-- topic1-processor

  Sub-topology: 1
    Source: topic2-source (topics: [TOPIC2])
      --> topic2-processor
    Processor: topic2-processor (stores: [])
      --> topic2-sink
      <-- topic2-source
    Sink: topic2-sink (topic: OUTPUT-TOPIC)
      <-- topic2-processor

Option B:

Topologies:
   Sub-topology: 0
    Source: topic1-source (topics: [TOPIC1])
      --> topic1-processor
    Source: topic2-source (topics: [TOPIC2])
      --> topic2-processor
    Processor: topic1-processor (stores: [])
      --> response-sink
      <-- topic1-source
    Processor: topic2-processor (stores: [])
      --> response-sink
      <-- topic2-source
    Sink: response-sink (topic: OUTPUT-TOPIC)
      <-- topic2-processor, topic1-processor
  1. If I use two streams for each topic instead of a single streams with multiple topic, would that work for what I am trying to achieve?
config1.put("application.id", "app1");
KakfaStreams stream1 = new KafkaStreams(config1, topologyTopic1);
stream1.start();

config2.put("application.id", "app2");
KakfaStreams stream2 = new KafkaStreams(config2, topologyTopic2);
stream2.start();

Solution

  • The initial assignments you describe, would never happen with Kafka Streams (And also not with any default Consumer config). If there are 5 partitions and you have 5 consumers, each consumer would get 1 partition assigned (for a plain consumer with a custom PartitionAssignor you could do the assignment differently, but all default implementations would ensure proper load balancing).

    Is it a good practice having a topology that reads from multiple topics within the same KafkaStreams instance?

    There is not issue with that.

    Is there any way to achieve a partition assignment like the following if I want go "full speed" for T1? [c1: t1p1, t2p1], [c2: t1p2], [c3: t1p3], [c4: t1p4], [c5: t1p5]

    Depending how you write your topology, this would be the assignment Kafka Streams uses out-of-the-box. For you two options, option B would result in this assignment.

    Which of the topologies below is most optimal to what I want to achieve? Or is it completely unrelated?

    As mentioned above, Option B would result in the assignment above. For Option A, you could actually even use a 6th instance and each instance would processes exactly one partition (because there are two sub-topologies, you get 6 tasks, 5 for sub-topology-0 and 1 for sub-topology-1; sub-topologies are scaled out independently of each other); for Option A, you only get 5 tasks though because there is only one sub-topology and thus the maximum number of partitions of both input topic (that is 5) determines the number of tasks.

    If I use two streams for each topic instead of a single streams with multiple topic, would that work for what I am trying to achieve?

    Yes, it would be basically the same as Option A -- however, you get two consumer groups and thus "two application" instead of one.