Search code examples
spring-bootapache-kafkakafka-consumer-apispring-kafka

fetch-min-size & max-poll-records sping kafka configurations does not work as expected


I am working on an Spring boot application with spring kafka which listens to a single topic of kafka and then segregates the records for respective categories, creates a json file out of it and uploads it to AWS S3.

I am receiving huge data volumes in Kafka topics and I need to make sure the json files are chunked appropriately huge to limit the number of json uploaded to S3.

Below is my application.yml configuration for kafka consumer.

spring:
  kafka:
    consumer:
      group-id: newton
      auto-offset-reset: earliest
      fetch-max-wait: 
        seconds: 1 
      fetch-min-size: 500000000
      max-poll-records: 50000000
      value-deserializer: com.forwarding.application.consumer.model.deserializer.MeasureDeserializer

I have created a listener for reading the topic continuously.

Even with the above configuration, I am receiving records in console as follows:

   2019-03-27T15:25:56.02+0530 [APP/PROC/WEB/0] OUT 2019-03-27 09:55:56.024  INFO 8 --- [ntainer#0-0-C-1] c.s.n.f.a.s.impl.ConsumerServiceImpl     : Time taken(ms) 56. No Of measures: 60
   2019-03-27T15:25:56.21+0530 [APP/PROC/WEB/2] OUT 2019-03-27 09:55:56.210  INFO 8 --- [ntainer#0-0-C-1] c.s.n.f.a.s.impl.ConsumerServiceImpl     : Time taken(ms) 80. No Of measures: 96
   2019-03-27T15:25:56.56+0530 [APP/PROC/WEB/0] OUT 2019-03-27 09:55:56.560  INFO 8 --- [ntainer#0-0-C-1] c.s.n.f.a.s.impl.ConsumerServiceImpl     : Time taken(ms) 76. No Of measures: 39
   2019-03-27T15:25:56.73+0530 [APP/PROC/WEB/2] OUT 2019-03-27 09:55:56.732  INFO 8 --- [ntainer#0-0-C-1] c.s.n.f.a.s.impl.ConsumerServiceImpl     : Time taken(ms) 77. No Of measures: 66

Can anyone please let me know what can be configured to get the received records as per the configuration in application.yml?


Solution

  • I just copied your configuration (except the max wait - see the syntax I used) and it worked fine...

    spring:
      kafka:
        consumer:
          group-id: newton
          auto-offset-reset: earliest
          fetch-max-wait: 1s
          fetch-min-size: 500000000
          max-poll-records: 50000000
    
    2019-03-27 13:43:55.454  INFO 98982 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [localhost:9092]
        check.crcs = true
        client.id = 
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 1000
        fetch.min.bytes = 500000000
        group.id = newton
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 50000000
        ...
    

    You set arbitrary properties that are not directly supported as boot properties, using the ...properties property.

    e.g.

    spring:
      kafka:
        consumer:
          properties:
            max.poll.interval.ms: 300000
    

    or

    spring:
      kafka:
        consumer:
          properties:
             max:
               poll:
                 interval:
                   ms: 300000
    

    The documentation is here.

    The properties supported by auto configuration are shown in Appendix A, Common application properties. Note that, for the most part, these properties (hyphenated or camelCase) map directly to the Apache Kafka dotted properties. Refer to the Apache Kafka documentation for details.

    The first few of these properties apply to all components (producers, consumers, admins, and streams) but can be specified at the component level if you wish to use different values. Apache Kafka designates properties with an importance of HIGH, MEDIUM, or LOW. Spring Boot auto-configuration supports all HIGH importance properties, some selected MEDIUM and LOW properties, and any properties that do not have a default value.

    Only a subset of the properties supported by Kafka are available directly through the KafkaProperties class. If you wish to configure the producer or consumer with additional properties that are not directly supported, use the following properties:

    spring.kafka.properties.prop.one=first

    spring.kafka.admin.properties.prop.two=second

    spring.kafka.consumer.properties.prop.three=third

    spring.kafka.producer.properties.prop.four=fourth

    spring.kafka.streams.properties.prop.five=fifth