Search code examples
javaspring-bootapache-kafkaspring-kafkaspring-kafka-test

How to wrap a @KafkaListener which is handled by seekToCurrentErrorHandler with a latch for testing


I have a kafka listener which throws an exception and goes to seektocurrenterrorhandler for handling. I am trying to wrap the listener to add a latch and assert on it for my testing. But it seems to be not working. Can someone please suggest.

Listener:

@KafkaListener(id="testRetry",topics = "sr1", groupId = "retry-grp", containerFactory = "kafkaRetryListenerContainerFactory")
    public void listen1(ConsumerRecord<String, Anky> record,
            @Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery, Acknowledgment ack) throws UserException {
            throw new UserException(code.getDbError());

    }

Config:

@Bean
    public ConcurrentKafkaListenerContainerFactory<String, Anky> kafkaRetryListenerContainerFactory(SeekToCurrentErrorHandler seekToCurrentErrorHandler) {

        ConcurrentKafkaListenerContainerFactory<String, Anky> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        // to get the no retry count from the header
        factory.getContainerProperties().setDeliveryAttemptHeader(true);
        factory.setErrorHandler(seekToCurrentErrorHandler);
        // to keep the consumers alive the failure gets reported to broker so that
        // consumers remain alive
        factory.setStatefulRetry(true);
        factory.setConcurrency(2);
        return factory;
    }


    @Bean
    public SeekToCurrentErrorHandler seekToCurrentErrorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
        SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
            //do nothing
        });
        errorHandler.setCommitRecovered(true);
        errorHandler.setBackOffFunction((record, exception) -> {
            return new FixedBackOff(0L,5L);

        });
        return errorHandler;
    }

Test:

@Test
public void testRetry() throws Exception,UserException {
    Anky msg = new Anky();
    this.template.send("sr1", "1234", msg);
    CountDownLatch latch = new CountDownLatch(1);
            ConcurrentMessageListenerContainer<?, ?> container = (ConcurrentMessageListenerContainer<?, ?>) kafkaListenerEndpointRegistry
            .getListenerContainer("testRetry");
    container.stop();
    @SuppressWarnings("unchecked")
    AcknowledgingConsumerAwareMessageListener<String, Anky> messageListener = (AcknowledgingConsumerAwareMessageListener<String, Anky>) container
            .getContainerProperties().getMessageListener();

    container.getContainerProperties()
            .setMessageListener(new AcknowledgingConsumerAwareMessageListener<String, Anky() {

                @Override
                public void onMessage(ConsumerRecord<String, Anky> data, Acknowledgment acknowledgment,
                        Consumer<?, ?> consumer) {
                    messageListener.onMessage(data, acknowledgment, consumer);
                    latch.countDown();
                }

            });
    container.start();
    assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
    
}

Solution

  • Add a try...finally block.

    try {
        messageListener.onMessage(data, acknowledgment, consumer);
    }
    finally {
        latch.countDown();
    }