Search code examples
spring-integrationspring-integration-dsl

How to use unique Kafka topic for each test run


When writing integration tests for my spring integration application which relies on Kafka as message source I often encounter the problem to create a clean test state between two test cases because the IntegrationFlow is configured to consume from a topic which I need to utilize in the tests. Doesn't matter if I use TestContainers or EmbeddedKafka, for both the best practice is to use a unique topic for a single test.

Anyway, I don't know how to re-initialize the IntegrationFlow with a new topic between two tests.

Imagine

    @Bean
    public StandardIntegrationFlow startKafkaInbound() {
        return IntegrationFlow.from(Kafka
                .messageDrivenChannelAdapter(
                        kafkaConsumerFactory,
                        ListenerMode.record,
                        serviceProperties.getImportTopic().getName())
                .messageConverter(messagingMessageConverter)
                .errorChannel(Objects.requireNonNull(errorHandler.getInputChannel()))
                .autoStartup(false)
        )
                .channel(Objects.requireNonNull(routeKafkaMessage().getInputChannel()))
                .get();
    }

Now when writing IntegrationTests, how to re-subscribe this flow to a different topic?

Best regards Daniel


Solution

  • We don't know what is your serviceProperties.getImportTopic(), but feel like this is the place where you could manipulate the topic name between tests and after flow stopping/starting.

    For better understanding what exactly is the problem with that, we might need to see what you are doing in your tests.

    UPDATE

    Yes, you are right: the topics are provided to the underlying ConcurrentMessageListenerContainer from that Kafka.messageDrivenChannelAdapter during creating. You cannot modify its state at runtime without reflection. In your test you stop this channel adapter. Use TestUtils.(messageDrivenChannelAdapter, "messageListenerContainer", ConcurrentMessageListenerContainer.class). Call its getContainerProperties(). Then DirectFieldAccessor to set the ContainerProperties.topics field into a desired random value. Then you start that messageDrivenChannelAdapter. The ConcurrentMessageListenerContainer creates child containers in its start() and populate the container properties for them.