I have crafted code based on the Java DSL documentation for Spring Integration. When I run the code, I keep on getting the following warning, even though the logs also suggest that the message has been passed successfully to both subscribers.
2021-03-04 17:21:25.589 WARN 46929 --- [enerContainer-1] bleJmsChannel$DispatchingMessageListener : Dispatcher has no subscribers for jms-channel 'application.jmsPublishSubscribeChannel'.
org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:153) ~[spring-integration-core-5.4.3.jar:5.4.3]
at org.springframework.integration.jms.SubscribableJmsChannel$DispatchingMessageListener.onMessage(SubscribableJmsChannel.java:229) ~[spring-integration-jms-5.4.4.jar:5.4.4]
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:761) ~[spring-jms-5.3.3.jar:5.3.3]
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:699) ~[spring-jms-5.3.3.jar:5.3.3]
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674) ~[spring-jms-5.3.3.jar:5.3.3]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318) ~[spring-jms-5.3.3.jar:5.3.3]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257) ~[spring-jms-5.3.3.jar:5.3.3]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189) ~[spring-jms-5.3.3.jar:5.3.3]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179) ~[spring-jms-5.3.3.jar:5.3.3]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076) ~[spring-jms-5.3.3.jar:5.3.3]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
2021-03-04 17:21:26.598 INFO 46929 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : GenericMessage [payload={"Greeting":"Hello from Node at Thu Mar 04 2021 17:21:25 GMT+0000 (Greenwich Mean Time)"}, headers={id=50ab26ae-7a3d-8a2e-f694-94928b5097d6, timestamp=1614878485580}]
I have @EnableIntegration
on my application, and my pub / sub component looks like:
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.channel.BroadcastCapableChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.jms.dsl.Jms;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import javax.jms.ConnectionFactory;
import java.util.concurrent.TimeUnit;
@Component
public class MessageFlowPub {
protected final Log logger = LogFactory.getLog(getClass());
@Autowired
private ConnectionFactory connectionFactory;
@Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel() {
return Jms.publishSubscribeChannel(connectionFactory)
.destination("dev/")
.get();
}
@Bean
public IntegrationFlow pubSubFlow() {
return f -> f
.publishSubscribeChannel(jmsPublishSubscribeChannel(),
pubsub -> pubsub
.subscribe(subFlow -> subFlow
.channel(c -> c.queue("jmsPubSubBridgeChannel1")))
.subscribe(subFlow -> subFlow
.channel(c -> c.queue("jmsPubSubBridgeChannel2"))))
.log()
.handle(System.out::println)
;
}
@Bean
public IntegrationFlow msgHandler1() {
return IntegrationFlows.from("jmsPubSubBridgeChannel1")
.bridge(e -> e.poller(Pollers.fixedRate(1, TimeUnit.SECONDS, 20)))
.log()
.handle(System.out::println)
.get();
}
@Bean
public IntegrationFlow msgHandler2() {
return IntegrationFlows.from("jmsPubSubBridgeChannel2")
.bridge(e -> e.poller(Pollers.fixedRate(1, TimeUnit.SECONDS, 20)))
.log()
.handle(System.out::println)
.get();
}
}
The warning message leads me to think that I am doing something fundamentally wrong here, but I can't see what.
OK, I found what's the problem.
You use a @Component
and @Bean
methods combination. More over you try to call one bean method from another: .publishSubscribeChannel(jmsPublishSubscribeChannel()
. This is not possible outside of the @Configuration
class. The annotation config just with the @Component
is considered as "lightweight" and therefore we can't call bean methods from each other - they are just not proxied to supply a proper dependency injection via method call.
This one should work for you as well:
@Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel() {
return Jms.publishSubscribeChannel(connectionFactory)
.destination("dev/")
.get();
}
@Bean
public IntegrationFlow pubSubFlow(BroadcastCapableChannel jmsPublishSubscribeChannel) {
return f -> f
.publishSubscribeChannel(jmsPublishSubscribeChannel,
Please, read more about lightweight config and proxyBeanMethods = false
:
https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#beans-java-basic-concepts