Search code examples
apache-kafkaintegration-testingspring-cloud-streamspring-kafka-testembedded-kafka

How can you verify immediately that a message was acknowledged when integration testing using Embedded Kafka in Spring Cloud Stream?


We use Spring Cloud Stream Kafka Binder (with Project Reactor integration, i.e. Flux streams) and manual offset commits (i.e. autoCommitOffset = false).

We are trying to write an integration test with Embedded Kafka from spring-kafka-test that's supposed to assert this all works, by manually reading the consumer group offset, using the admin client, before and after the test sends a message to our topic.

Tests fail intermittently. Using awaitility we are now waiting up to 10 seconds to poll the offset, and this seems to get around most of our issues, as the offset will change after around 7 seconds - but that's unsatisfactory for testing.

Is there a way to make sure Spring Cloud Stream Kafka Binder will write the offset change immediately once we manually acknowledge message receipt by calling Acknowledgement.acknowledge()?

Put differently: how can we verify acknowledge was called in our tests without having to wait?

We use Kotlin, Mockito and Mockito-kotlin and thus cannot use PowerMockito.


Solution

  • The problem is the Consumer is not thread safe. The commits have to be done on the container thread. If the consumer is sitting in poll() when you ack, you have to wait for up to pollTimeout before the offset will be committed.

    The default pollTimeout is 5 seconds.

    You can add a ListenerContainerCustomizer @Bean to modify the ContainerProperties.