When writing integration tests for my spring integration application which relies on Kafka as message source I often encounter the problem to create a clean test state between two test cases because the IntegrationFlow is configured to consume from a topic which I need to utilize in the tests. Doesn't matter if I use TestContainers or EmbeddedKafka, for both the best practice is to use a unique topic for a single test.
Anyway, I don't know how to re-initialize the IntegrationFlow with a new topic between two tests.
Imagine
@Bean
public StandardIntegrationFlow startKafkaInbound() {
return IntegrationFlow.from(Kafka
.messageDrivenChannelAdapter(
kafkaConsumerFactory,
ListenerMode.record,
serviceProperties.getImportTopic().getName())
.messageConverter(messagingMessageConverter)
.errorChannel(Objects.requireNonNull(errorHandler.getInputChannel()))
.autoStartup(false)
)
.channel(Objects.requireNonNull(routeKafkaMessage().getInputChannel()))
.get();
}
Now when writing IntegrationTests, how to re-subscribe this flow to a different topic?
Best regards Daniel
We don't know what is your serviceProperties.getImportTopic()
, but feel like this is the place where you could manipulate the topic name between tests and after flow stopping/starting.
For better understanding what exactly is the problem with that, we might need to see what you are doing in your tests.
UPDATE
Yes, you are right: the topics are provided to the underlying ConcurrentMessageListenerContainer
from that Kafka.messageDrivenChannelAdapter
during creating. You cannot modify its state at runtime without reflection. In your test you stop this channel adapter. Use TestUtils.(messageDrivenChannelAdapter, "messageListenerContainer", ConcurrentMessageListenerContainer.class)
.
Call its getContainerProperties()
. Then DirectFieldAccessor
to set the ContainerProperties.topics
field into a desired random value. Then you start that messageDrivenChannelAdapter
. The ConcurrentMessageListenerContainer
creates child containers in its start()
and populate the container properties for them.