Search code examples
spring-integrationibm-mqspring-integration-dsl

Getting "Dispatcher has no subscribers for jms-channel" using Jms Pub/Sub with Spring Integration using DSL


I have crafted code based on the Java DSL documentation for Spring Integration. When I run the code, I keep on getting the following warning, even though the logs also suggest that the message has been passed successfully to both subscribers.


2021-03-04 17:21:25.589  WARN 46929 --- [enerContainer-1] bleJmsChannel$DispatchingMessageListener : Dispatcher has no subscribers for jms-channel 'application.jmsPublishSubscribeChannel'.

org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:153) ~[spring-integration-core-5.4.3.jar:5.4.3]
    at org.springframework.integration.jms.SubscribableJmsChannel$DispatchingMessageListener.onMessage(SubscribableJmsChannel.java:229) ~[spring-integration-jms-5.4.4.jar:5.4.4]
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:761) ~[spring-jms-5.3.3.jar:5.3.3]
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:699) ~[spring-jms-5.3.3.jar:5.3.3]
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674) ~[spring-jms-5.3.3.jar:5.3.3]
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318) ~[spring-jms-5.3.3.jar:5.3.3]
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257) ~[spring-jms-5.3.3.jar:5.3.3]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189) ~[spring-jms-5.3.3.jar:5.3.3]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179) ~[spring-jms-5.3.3.jar:5.3.3]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076) ~[spring-jms-5.3.3.jar:5.3.3]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

2021-03-04 17:21:26.598  INFO 46929 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : GenericMessage [payload={"Greeting":"Hello from Node at Thu Mar 04 2021 17:21:25 GMT+0000 (Greenwich Mean Time)"}, headers={id=50ab26ae-7a3d-8a2e-f694-94928b5097d6, timestamp=1614878485580}]


I have @EnableIntegration on my application, and my pub / sub component looks like:

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.channel.BroadcastCapableChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.jms.dsl.Jms;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import javax.jms.ConnectionFactory;
import java.util.concurrent.TimeUnit;

@Component
public class MessageFlowPub {

    protected final Log logger = LogFactory.getLog(getClass());

    @Autowired
    private ConnectionFactory connectionFactory;

    @Bean
    public BroadcastCapableChannel jmsPublishSubscribeChannel() {
        return Jms.publishSubscribeChannel(connectionFactory)
                .destination("dev/")
                .get();
    }

    @Bean
    public IntegrationFlow pubSubFlow() {
        return f -> f
                .publishSubscribeChannel(jmsPublishSubscribeChannel(),
                        pubsub -> pubsub
                                .subscribe(subFlow -> subFlow
                                        .channel(c -> c.queue("jmsPubSubBridgeChannel1")))
                                .subscribe(subFlow -> subFlow
                                        .channel(c -> c.queue("jmsPubSubBridgeChannel2"))))
                .log()
                .handle(System.out::println)
                ;
    }

    @Bean
    public IntegrationFlow msgHandler1() {
        return IntegrationFlows.from("jmsPubSubBridgeChannel1")
                .bridge(e -> e.poller(Pollers.fixedRate(1, TimeUnit.SECONDS, 20)))
                .log()
                .handle(System.out::println)
                .get();
    }

    @Bean
    public IntegrationFlow msgHandler2() {
        return IntegrationFlows.from("jmsPubSubBridgeChannel2")
                .bridge(e -> e.poller(Pollers.fixedRate(1, TimeUnit.SECONDS, 20)))
                .log()
                .handle(System.out::println)
                .get();
    }
    
}

The warning message leads me to think that I am doing something fundamentally wrong here, but I can't see what.


Solution

  • OK, I found what's the problem.

    You use a @Component and @Bean methods combination. More over you try to call one bean method from another: .publishSubscribeChannel(jmsPublishSubscribeChannel(). This is not possible outside of the @Configuration class. The annotation config just with the @Component is considered as "lightweight" and therefore we can't call bean methods from each other - they are just not proxied to supply a proper dependency injection via method call.

    This one should work for you as well:

    @Bean
    public BroadcastCapableChannel jmsPublishSubscribeChannel() {
        return Jms.publishSubscribeChannel(connectionFactory)
                .destination("dev/")
                .get();
    }
    
    @Bean
    public IntegrationFlow pubSubFlow(BroadcastCapableChannel jmsPublishSubscribeChannel) {
        return f -> f
                .publishSubscribeChannel(jmsPublishSubscribeChannel,
    

    Please, read more about lightweight config and proxyBeanMethods = false: https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#beans-java-basic-concepts