Search code examples
springspring-integrationspring-integration-dslspring-integration-mail

Dispatcher has no subscribers while using spring-integation-mail


We have a multitenant application, which allows users to create IMAP mail fetchers (aka mail-receivers) on-demand and handle them separately. For this we decided to use: spring-integration-mail (v 6.1.4). Since users can create/edit/pause/resume the mail fetchers from UI we also decided to go the dynamic way of registering/removing the components according to the documentation. Here is the code, which creates and registers the component:

integrationFlowContext.registration(
      IntegrationFlow
            .from(
                 Mail.imapInboundAdapter(
                     factory.create(imapConfiguration) // creates an ImapMailReceiver instance
                 )
            ) {
                 it.poller(
                     Pollers
                           .trigger(PeriodicTrigger(ofSeconds(10)))
                           .taskExecutor(taskExecutor)
                           .transactional(transactionalManager)
                           .errorHandler(errorHandler)
                 )
            }
                .log<Message<*>> {
                      mailReceiverLog.info { "Receiving message: Message(headers=${it.headers.map { header -> "${header.key}=${header.value}" }}, payload=${it.payload})" }
                }
                .channel(MessageChannels.executor(taskExecutor))
                .transform(MailReceiverMessageTransformer())
                .handle { payload: MailMessage ->
                      try {
                           // handle here
                      } catch (e: Throwable) {
                           log.error(
                               "Could not handle mail message: $payload, execution will continue",
                               e
                           )
                      }
                }
                .get()
)
    .autoStartup(true)
    .id(id)
    .useFlowIdAsPrefix()
    .register()

The code for removal:

val toRemove = integrationFlowContext.getRegistrationById(id)
if (toRemove != null) {
    toRemove.destroy()
}

Other components are used:

@Bean
fun mailReceiverTaskExecutor(): TaskExecutor =
    ThreadPoolTaskExecutor().apply {
        corePoolSize = 20
    }

@Bean
fun mailReceiverTransactionManager(): PseudoTransactionManager = PseudoTransactionManager()

This setup works pretty stable once started. But, after user tries to disable mailreceiver and enable it again, it can not collect the emails from IMAP store with the "Dispatcher has no subscribers" exception. And the weirdest part is that according to logs, there are no evidence of channel, losing it's subscriber:

// Disable aka mail-receiver removal starts here
Unregistering mail receiver: mailreceiver-1
stopped bean 'mailreceiver-1.org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0'
Removing {bridge} as a subscriber to the 'mailreceiver-1.channel#1' channel
Channel 'backend.mailreceiver-1.channel#1' has 0 subscriber(s).
stopped bean 'mailreceiver-1.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
Removing {transformer} as a subscriber to the 'mailreceiver-1.channel#2' channel
Channel 'backend.mailreceiver-1.channel#2' has 0 subscriber(s).
stopped bean 'mailreceiver-1.org.springframework.integration.config.ConsumerEndpointFactoryBean#1'
Removing {service-activator} as a subscriber to the 'mailreceiver-1.channel#3' channel
Channel 'backend.mailreceiver-1.channel#3' has 0 subscriber(s).
stopped bean 'mailreceiver-1.org.springframework.integration.config.ConsumerEndpointFactoryBean#2'
Mail receiver successfully unregistered: mailreceiver-1
// Enable aka mail-receiver registration starts here
Registering mail receiver: mailreceiver-1
Adding {service-activator} as a subscriber to the 'mailreceiver-1.channel#3' channel
Channel 'backend.mailreceiver-1.channel#3' has 1 subscriber(s).
started bean 'mailreceiver-1.org.springframework.integration.config.ConsumerEndpointFactoryBean#2'
Adding {transformer} as a subscriber to the 'mailreceiver-1.channel#2' channel
Channel 'backend.mailreceiver-1.channel#2' has 1 subscriber(s).
started bean 'mailreceiver-1.org.springframework.integration.config.ConsumerEndpointFactoryBean#1'
Adding {bridge} as a subscriber to the 'mailreceiver-1.channel#1' channel
Channel 'backend.mailreceiver-1.channel#1' has 1 subscriber(s).
started bean 'mailreceiver-1.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
started bean 'mailreceiver-1.org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0'
Mail receiver successfully registered: mailreceiver-1
org.springframework.integration.support.MessagingExceptionWrapper: null\n\tat org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:477)\n\tat org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:460)\n\tat java.base/jdk.internal.reflect.GeneratedMethodAccessor112.invoke(Unknown Source)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)\n\tat java.base/java.lang.reflect.Method.invoke(Unknown Source)\n\tat org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343)\n\tat org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196)\n\tat org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)\n\tat org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123)\n\tat org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:391)\n\tat org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)\n\tat org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184)\n\tat org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:244)\n\tat jdk.proxy2/jdk.proxy2.$Proxy180.call(Unknown Source)\n\tat org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:412)\n\tat org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:348)\n\tat org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)\n\tat java.base/java.lang.Thread.run(Unknown Source)\nCaused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'backend.mailreceiver-1.channel#1'.\n\tat org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)\n\tat org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:375)\n\tat org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:346)\n\tat org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:326)\n\tat org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:299)\n\tat org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)\n\tat org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)\n\tat org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)\n\tat org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)\n\tat org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:196)\n\tat org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:474)\n\t... 19 common frames omitted\nCaused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers\n\tat org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)\n\tat org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)\n\tat org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)\n\t... 29 common frames omitted

Any ideas?

Update #1: I wonder if the executor-channel might be the issue as it's lack of available threads?

Update #2: Based on Artem's suggestions I have refactored the code to the following:

integrationFlowContext.registration(
      IntegrationFlow
            .from(
                 Mail.imapInboundAdapter(
                     factory.create(imapConfiguration) // creates an ImapMailReceiver instance
                 )
            ) {
                 it.poller(
                     Pollers
                           .trigger(PeriodicTrigger(ofSeconds(10)))
                           .taskExecutor(taskExecutor)
                           .transactional(transactionalManager)
                           .errorHandler(errorHandler)
                 )
            }
                .log<Message<*>> {
                      mailReceiverLog.info { "Receiving message: Message(headers=${it.headers.map { header -> "${header.key}=${header.value}" }}, payload=${it.payload})" }
                }
                .channel("handleMailChannel")
                .get()
)
    .autoStartup(true)
    .id(id)
    .useFlowIdAsPrefix()
    .register()

and

integrationFlowContext.registration(
      IntegrationFlow
            .from("handleMailChannel")
                .handle { payload: MailMessage ->
                      try {
                           // handle here
                      } catch (e: Throwable) {
                           log.error(
                               "Could not handle mail message: $payload, execution will continue",
                               e
                           )
                      }
                }
                .get()
)
    .autoStartup(true)
    .id(id)
    .useFlowIdAsPrefix()
    .register()

basically having very light dynamic flow components (which act as a mail fetchers and forward all the messages to another channel) and more heavy handle flow which is static and handles all the incoming messages regardless of their origin.


Solution

  • First of all there is a dedicated Kotlin DSL: https://docs.spring.io/spring-integration/reference/kotlin-dsl.html. No need to try to fight Kotlin language with Java DSL API.

    Secondly, there is no need in extra ExecutorChannel in between since poller is already doing its job in the separate scheduled threads. More over I see that you even try to shift its work to the taskExecutor(taskExecutor).

    And one more argument: the dynamic flow registration is so heavy that it is better to revise your logic to register as less components as possible. Probably everything after Mail.imapInboundAdapter() is the same for all your dynamic flows. Therefore look into something like singleton IntegrationFlow starting from the channel and finish your dynamic flows with a channel("mailProcessingFlowInput") to send messages after polling in your dynamic flows to that channel. This way only short number of components are registered at runtime and afterwards removes. The rest remains the same for the whole application lifecycle.