Search code examples
javaapache-kafkakafka-consumer-apispring-kafka

Not able to poll / fetch all records from kafka topic


I am trying to poll data from a specific topic like kafka is receiving 100 records/s but most of the time it does not fetch all records. I am using timeout as 5000ms and I am calling this method every 100ms Note : I am subscribing to the specific topic too

@Scheduled(fixedDelayString = "100")

    public void pollRecords() {
        ConsumerRecords<String, String> records = 
        leadConsumer.poll("5000");

How can I fetch all the data from kafka ?


Solution

  • Maximum number of records returned from poll() is specified with max.poll.records consumer config parameter. (default is 500) Also, there are another consumer config parameters which limits the maximum amount of data returned from server. (fetch.max.bytes and max.partition.fetch.bytes)

    On the other hand, on broker side there is another size limit which is called message.max.bytes.

    So you should set these parameters properly to get more messages.

    From Kafka docs (link):

    max.poll.records: The maximum number of records returned in a single call to poll(). (default: 500)

    fetch.max.bytes: The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). Note that the consumer performs multiple fetches in parallel. (default:52428800)

    message.max.bytes: The largest record batch size allowed by Kafka. If this is increased and there are consumers older than 0.10.2, the consumers' fetch size must also be increased so that the they can fetch record batches this large. In the latest message format version, records are always grouped into batches for efficiency. In previous message format versions, uncompressed records are not grouped into batches and this limit only applies to a single record in that case.This can be set per topic with the topic level max.message.bytes config. (default: 1000012)

    max.partition.fetch.bytes: The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). See fetch.max.bytes for limiting the consumer request size. (default: 1048576)