Search code examples
apache-kafkakafka-consumer-apikafka-partition

Consuming kafka batch for multiple partitions


I understand that Kafka can pull events in batches. I am trying to understand this scenario:

  • I have 4 partitions for a topic
  • I have 1 consumer which Kafka assigns all 4 partitions to it.
  • let's assume every batch Kafka client pulls from Kafka is 5 messages.

What I'm trying to understand here is if events in 1 batch are all from the same partition, and then round-robin to the next partition batch. or does the batch itself already contains events from different partitions?


Solution

  • I can't give you a precise answer but found it interesting enough to test it out.

    For this, I have created a topic with four partitions and used the kafka-producer-perf-test command line tool to produce some messages into the topic. As the performance test tool will not create any keys at all, the messages are written into the topic partitions in round-robin.

    kafka-producer-perf-test --topic test --num-records 1337 --throughput -1 --record-size 128 --producer-props key.serializer=org.apache.kafka.common.serialization.StringSerializer --producer-props value.serializer=org.apache.kafka.common.serialization.StringSerializer --producer-props bootstrap.servers=localhost:9092
    

    Afterwards, I have created a simple KafkaConsumer using the configuration max_poll_records=5 to match your question. The consumer simply prints out the offset and partition of each message consumed:

    Integer counter = 0;
    
    // consume messages with `poll` call and print out results
    try(KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(settings)) {
        consumer.subscribe(Arrays.asList(topic));
        while (true) {
            System.out.printf("Batch = %d\n", counter);
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, partition = %d\n", record.offset(), record.partition());
            }
            counter += 1;
        }
    }
    

    The result, answering your question is, that the consumer tries to fetch as many data from one partition before it moves on to the other. Only in the scenario where all messages from partition 1 were consumed but the limit of max_poll_records of 5 was not reached yet it added two more messages from partition 2.

    Here are some of the prints to get a better understanding.

    Batch = 0
    offset = 310, partition = 0
    offset = 311, partition = 0
    offset = 312, partition = 0
    offset = 313, partition = 0
    offset = 314, partition = 0
    
    Batch = 1
    offset = 315, partition = 0
    offset = 316, partition = 0
    offset = 317, partition = 0
    offset = 318, partition = 0
    offset = 319, partition = 0
    
    # only offsets with partition 0
    
    Batch = 45
    offset = 525, partition = 0
    offset = 526, partition = 0
    offset = 527, partition = 0
    offset = 528, partition = 0
    offset = 529, partition = 0
    Batch = 46
    offset = 728, partition = 1
    offset = 729, partition = 1
    offset = 730, partition = 1
    offset = 731, partition = 1
    offset = 732, partition = 1
    
    # only offsets with partition 1
    
    Batch = 86
    offset = 928, partition = 1
    offset = 929, partition = 1
    offset = 930, partition = 1
    offset = 931, partition = 1
    offset = 932, partition = 1
    Batch = 87
    offset = 465, partition = 2
    offset = 466, partition = 2
    offset = 933, partition = 1
    offset = 934, partition = 1
    offset = 935, partition = 1
    Batch = 88
    offset = 467, partition = 2
    offset = 468, partition = 2
    offset = 469, partition = 2
    offset = 470, partition = 2
    offset = 471, partition = 2
    
    ## and so on