Search code examples
spring-integrationspring-testspring-integration-dsl

Sending message to DirectChannel throws "no subscribers" exception when using SpringIntegrationTest


I'm writing a test class for testing my Spring Integration. I set a direct channel and inbound IntegrationFlow from that channel as follows:

@Configuration
class MqttIntegrationConfig {
    @Bean
    @Qualifier("mqttChannelAdapter")
    fun mqttChannelAdapter(): MessageProducerSupport {
        val adapter = MqttPahoMessageDrivenChannelAdapter(...)
        // ... other setup omitted for brevity

        // ** make a DirectChannel and pipe it to mqtt adapter **
        val outputChannel = MessageChannels.direct().get()
        adapter.outputChannel = outputChannel

        return adapter
    }

    @Bean
    fun mqttInbound(): IntegrationFlow {
        return IntegrationFlows.from(mqttChannelAdapter())
            .handle<String> { payload, headers ->
                logger.info("payload=$payload, headers=$headers")
                payload
            }
            .channel(mqttOutboundChannel())
            .get()
    }

    // other setups are omitted for brevity

}

To test it, I used @SpringIntegrationTest and its noAutoStartup to make it don't connect to actual MQTT.

@ExtendWith(SpringExtension::class)
@DirtiesContext
@SpringIntegrationTest(noAutoStartup = ["mqttChannelAdapter"])
@ContextConfiguration(classes = [MqttIntegrationConfig::class, MyTestConfig::class])
class MyIntegrationTest {

    @Autowired
    @Qualifier("mqttChannelAdapter")
    private lateinit var mqttChannelAdapter: MessageProducerSupport

    @Test
    fun mytest() {
        mqttChannelAdapter.outputChannel?.send(GenericMessage(1))
    }
}

When I run myTest method it throws MessageDispatchingException as follows:

Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=1, headers={id=be5128f8-5869-0f9d-2cbb-ddaf4dafbc8e, timestamp=1688747038419}]

But when I changed the direct channel which was set as output channel of message adapter to a queue channel, it works.

val outputChannel = MessageChannels.queue().get()
adapter.outputChannel = outputChannel

I know it takes time to make integration channel be ready(?) and for real production usage, the message wouldn't come in before the integration flow ready, so DirectChannel would work.

To make this test work, is there a way to ensure that test method waits until all channel is ready so my DirectChannel has subscribers?


Solution

  • Works as described - no any problems.

    I don't know what is going on since it looks like you use some old Spring Integration version, so our difference in environment might be a case.

    Another guess is missed @EnableIntegration on one your configuration class.

    As a bonus: we have a dedicated Kotlin DSL for Spring Integration: https://docs.spring.io/spring-integration/docs/current/reference/html/kotlin-dsl.html#kotlin-dsl