Search code examples
kotlinspring-cloud-streamwiremockspring-retryretrytemplate

StreamRetryTemplate for Spring Cloud Streams not retrying in integration tests


We are utilizing Spring Cloud Streams that listen to a Kafka topic and call a rest service. We also implement a custom StreamRetryTemplate to specify what kind of errors we deem recoverable and which we do not. I cannot get consistent results between how it works at runtime and how it works in integration tests.

I've verified in debug mode that the exception is being thrown properly and that the RetryTemplate is being injected in properly but it just doesn't seem to be utilized in my integration tests.

@EnableBinding(Sink::class)
class MyListener(private val myService: Service) {

  @StreamListener(Sink.Input)
  fun consume(@Payload msg: MyMessage) = myService.process(msg)

  @SteamRetryTemplate
  fun getRetryTemplate() = RetryTemplate()
}

When I run this app and myService throws an exception I expect it to be retried, and it does so perfectly. But when I write integration tests with a wiremock server and have myService throw an exception it does not retry. I have assert statements to verify how many times my wiremock endpoint is hit.

Am I missing something specifically for retries to work in integration tests?


Solution

  • Are you using the test binder or the embedded kafka broker? The test binder is rather limited; using the embedded broker is preferred for full integration testing.

    See Testing Applications in the Spring for Apache Kafka Documentation.

    EDIT

    @SpringBootApplication
    @EnableBinding(Sink.class)
    public class So55855151Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So55855151Application.class, args);
        }
    
        @StreamListener(Sink.INPUT)
        public void listen(String in) {
            System.out.println(in);
            throw new RuntimeException("fail");
        }
    
        @StreamRetryTemplate
        public RetryTemplate retrier() {
            return new RetryTemplate();
        }
    
    }
    
    spring.cloud.stream.bindings.input.group=input
    spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    @EmbeddedKafka
    public class So55855151ApplicationTests {
    
        @Autowired
        private KafkaTemplate<byte[], byte[]> template;
    
        @Autowired
        private RetryTemplate retrier;
    
        @Test
        public void test() throws InterruptedException {
            CountDownLatch latch = new CountDownLatch(5);
            this.retrier.registerListener(new RetryListener() {
    
                @Override
                public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
                    System.out.println("open");
                    latch.countDown();
                    return true;
                }
    
                @Override
                public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
                        Throwable throwable) {
    
                    System.out.println("close");
                    latch.countDown();
                }
    
                @Override
                public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
                        Throwable throwable) {
    
                    System.out.println("onError: " + throwable);
                    latch.countDown();
                }
    
            });
    
            this.template.send("input", "test".getBytes());
            assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
        }
    
    }