I'm writing a test class for testing my Spring Integration. I set a direct channel and inbound IntegrationFlow
from that channel as follows:
@Configuration
class MqttIntegrationConfig {
@Bean
@Qualifier("mqttChannelAdapter")
fun mqttChannelAdapter(): MessageProducerSupport {
val adapter = MqttPahoMessageDrivenChannelAdapter(...)
// ... other setup omitted for brevity
// ** make a DirectChannel and pipe it to mqtt adapter **
val outputChannel = MessageChannels.direct().get()
adapter.outputChannel = outputChannel
return adapter
}
@Bean
fun mqttInbound(): IntegrationFlow {
return IntegrationFlows.from(mqttChannelAdapter())
.handle<String> { payload, headers ->
logger.info("payload=$payload, headers=$headers")
payload
}
.channel(mqttOutboundChannel())
.get()
}
// other setups are omitted for brevity
}
To test it, I used @SpringIntegrationTest
and its noAutoStartup
to make it don't connect to actual MQTT.
@ExtendWith(SpringExtension::class)
@DirtiesContext
@SpringIntegrationTest(noAutoStartup = ["mqttChannelAdapter"])
@ContextConfiguration(classes = [MqttIntegrationConfig::class, MyTestConfig::class])
class MyIntegrationTest {
@Autowired
@Qualifier("mqttChannelAdapter")
private lateinit var mqttChannelAdapter: MessageProducerSupport
@Test
fun mytest() {
mqttChannelAdapter.outputChannel?.send(GenericMessage(1))
}
}
When I run myTest
method it throws MessageDispatchingException
as follows:
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=1, headers={id=be5128f8-5869-0f9d-2cbb-ddaf4dafbc8e, timestamp=1688747038419}]
But when I changed the direct channel which was set as output channel of message adapter to a queue channel, it works.
val outputChannel = MessageChannels.queue().get()
adapter.outputChannel = outputChannel
I know it takes time to make integration channel be ready(?) and for real production usage, the message wouldn't come in before the integration flow ready, so DirectChannel
would work.
To make this test work, is there a way to ensure that test method waits until all channel is ready so my DirectChannel
has subscribers?
Works as described - no any problems.
I don't know what is going on since it looks like you use some old Spring Integration version, so our difference in environment might be a case.
Another guess is missed @EnableIntegration
on one your configuration class.
As a bonus: we have a dedicated Kotlin DSL for Spring Integration: https://docs.spring.io/spring-integration/docs/current/reference/html/kotlin-dsl.html#kotlin-dsl