Search code examples
apache-kafkakafka-consumer-api

How does one Kafka consumer read from more than one partition?


I would like to know how one consumer consumes from more than one partition, specifically, in what order are messages read from the different partitions?

I had a peek at the source code (Consumer, Fetcher) but I can't really follow all of it.

This is what I thought would happen:

Partitions are read sequentially. That is: all the messages in one partition will be read before continuing to the next. If we reach max.poll.records without consuming the whole partition, the next fetch will continue reading the current partition until it is exhausted, before going on to the next.

I tried setting max.poll.records to a relatively low number and seeing what happens. If I send messages to a topic and then start a consumer, all the messages are read from one partition before continuing to the next, even if the number of messages in that partition is higher than max.poll.records.

Then I tried to see if I could "lock" the consumer in one partition, by sending messages to that partition continuously (using JMeter). But I couldn't do it: messages from other partitions were also being read.


Solution

  • I have read the KIP mentioned in the answer to the question linked in the comments and I think I finally understood how the consumer works.

    There are two main configuration options that affect how data is consumed:

    • max.partition.fetch.bytes: the maximum amount of data that the server will return for a given partition

    • max.poll.records: the maximum amount of records that are returned each time the consumer polls


    The process of fetching from each partition is greedy and proceeds in a round-robin way. Greedy means that as many records as possible will be retrieved from each partition; if all records in a partition occupy less than max.partition.fetch.bytes, then all of them will be fetched; otherwise, only max.partition.fetch.bytes will be fetched.

    Now, not all the fetched records will be returned in a poll call. Only max.poll.records will be returned.

    The remaining records will be retained for the next call to poll. Moreover, if the number of retained records is less than max.poll.records, the poll method will start a new round of fetching (pre-fetching) before returning. This means that, usually, the consumer is processing records while new records are being fetched.


    If some partitions receive considerably more messages than others, this could lead to the less active partitions not being processed for long periods of time.

    The only downside to this approach is that it could lead to some partitions going unconsumed for an extended amount of time when there is a large imbalance between the partition's respective message rates. For example, suppose that a consumer with max messages set to 1 fetches data from partitions A and B. If the returned fetch includes 1000 records from A and no records from B, the consumer will have to process all 1000 available records from A before fetching on partition B again.

    In order to prevent this, we could reduce max.partition.fetch.bytes.