Search code examples
apache-kafkaspring-cloud-streamspring-cloud-dataflow

Where are Kafka consumer properties set in an SCDF context?


I am trying to work myself around a CommitFailedException by tinkering with max.poll.interval.ms, session.timeout.ms and heartbeat.interval.ms. I have tried changing these values in the application.properties file, but when the app is deployed in an SCDF context, the values I set here are overridden somewhere else. When running the app locally as a stand-alone Spring Boot app, the property values are set to the same as specified in the application.properties file.

The dataflow-kafka container is based on the image confluentinc/cp-kafka:5.2.1

Where do I tweak Kafka consumer properties in an SCDF context?


Solution

  • The max.poll.interval.ms, session.timeout.ms and heartbeat.interval.ms appear to be Kafka's consumer configuration.

    If you intend to use and override them in your Spring Cloud Stream consumer, you'd have to spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar notation.

    In this example, where foo could be max.poll.interval.ms and bar would be its value: 300000. For more details, see ref. guide.

    Once when you have all the desired consumer properties defined in this pattern, you can test the application locally via java -jar. You can then assure when deployed from SCDF; it would run the same way, as well. Nothing should get overridden.