Search code examples
spring-bootspring-integrationspring-integration-dsl

When receiving message MessageDeliveryException: Dispatcher has no subscribers


anUPDATE: After moving fromTcp() from the holder class to the endpoint class it all worked well again, now I'm baffled why it even works now because for me it makes no sense for why it works

After some headhaches and searching in all stackoverflow i have a dispatcher has no suscriber when i recieve a message from the server, but not when i send a response back where everything works fine.

connection generator :

@Service
@EnableIntegration
public class TcpConnectionsHolder {


@Autowired
private IntegrationFlowContext flowContext;


/**
 * Definition of flow channels
 * 
 * @return MessageChannel
 */
@Bean
public MessageChannel fromTcp() {
    final DirectChannel channel = new DirectChannel();
    channel.addInterceptor(new ChannelInterceptorAdapter() {
        @Override
        public Message<?> preSend(final Message<?> message, final MessageChannel channel) {
            // Parse Message byte[] to StringHex
            final byte[] bMessagePayload = (byte[]) message.getPayload();
            return MessageBuilder.withPayload(Hex.encodeHexString(bMessagePayload))
                    .copyHeaders(message.getHeaders()).build();
        }
    });
    return channel;
}

private final LinkedHashMap<String, TcpNetClientConnectionFactory> clientConnect =
        new LinkedHashMap<String, TcpNetClientConnectionFactory>();

private final LinkedHashMap<String, TcpReceivingChannelAdapter> reciverAdapter =
        new LinkedHashMap<String, TcpReceivingChannelAdapter>();

private final LinkedHashMap<String, MessageChannel> sendingAdpater =
        new LinkedHashMap<String, MessageChannel>();

public MessageChannel getMessageChannel(String host, int port) {
    return sendingAdpater.get(host+port);
}


public TcpReceivingChannelAdapter getReceiverChannel(String host, int port) {
    return reciverAdapter.get(host+port);
}

private TcpNetClientConnectionFactory getclientConnectionFactory(String host, int port,int headBytes) {
    
    TcpNetClientConnectionFactory cf = clientConnect.get(host+port);
    if(cf==null) {
        cf = new TcpNetClientConnectionFactory(host, port);
        final ByteArrayLengthHeaderSerializer by = new ByteArrayLengthHeaderSerializer(headBytes);
        cf.setSingleUse(false);
        cf.setSoKeepAlive(true);
        cf.setSerializer(by);
        cf.setDeserializer(by);
        clientConnect.put(host+port,cf);
    }
    return cf;
}
public TcpReceivingChannelAdapter addReceiverChannel(String host, int port) {
    return addReceiverChannel(host,port,2,2000);
}
public TcpReceivingChannelAdapter addReceiverChannel(String host, int port,int headBytes,int retryInterval) {
    TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
    TcpNetClientConnectionFactory cf = getclientConnectionFactory(host,port,headBytes);
    adapter.setConnectionFactory(cf);
    adapter.setClientMode(true);
    adapter.setErrorChannelName("errorChannel");
    adapter.setRetryInterval(retryInterval);
    adapter.setOutputChannel(fromTcp());
    
    IntegrationFlow flow = IntegrationFlows.from(adapter).get();
    
    this.flowContext.registration(flow).id(host+port + ".in").addBean(cf).register();
    
    this.reciverAdapter.put(host+port, adapter);
    
    return adapter;
}

public MessageChannel addSendingChannel(String host, int port) {
    return addSendingChannel(host,port,2);
}

public MessageChannel addSendingChannel(String host, int port,int headBytes) {
       TcpSendingMessageHandler sender = new TcpSendingMessageHandler();
       sender.setConnectionFactory(getclientConnectionFactory(host,port,headBytes));
                
        IntegrationFlow flow = f -> f.handle(sender);
        
        IntegrationFlowRegistration flowRegistration =
                this.flowContext.registration(flow).id(host+port + ".out").register();
                
        MessageChannel inputChannel = flowRegistration.getInputChannel();
        this.sendingAdpater.put(host+port, inputChannel);
        return inputChannel;
}

public void removeReceiverChannel(String host, int port) {
    this.reciverAdapter.remove(host+port);
    this.flowContext.remove(host+port + ".in");
}

public void removeSendingChannel(String host, int port) {
    this.sendingAdpater.remove(host+port);
    this.flowContext.remove(host+port + ".out");
}

}

an message endpoint:

@Configuration
@MessageEndpoint
public class BridgeMessageEndpoint {

private static final Logger LOGGER = LoggerFactory.getLogger(BridgeMessageEndpoint.class);

@Autowired
private ApplicationContext applicationContext;

@ServiceActivator(inputChannel = "fromTcp")
public void outbound(final String inMessage, final @Headers Map<String, Object> headerMap) {
    sendToApi(inMessage, headerMap);
}

private void sendToApi(final String inMessage, final Map<String, Object> headerMap) {
    LOGGER.debug("Recuperando el mensaje Hex {}", inMessage);
    final PaymentOrder paymentOrder = new PaymentOrder();
    paymentOrder.setMessage(inMessage);
    final SplitterRestClient splitterRestClient = applicationContext.getBean(SplitterRestClient.class);
    splitterRestClient.reportPayment(paymentOrder, headerMap);
}

@Bean
@ServiceActivator(inputChannel = "errorChannel")
public MessageHandler logger() {
    final LoggingHandler loggingHandler = new LoggingHandler(LoggingHandler.Level.DEBUG.name());
    loggingHandler.setLoggerName("Log");
    return loggingHandler;
}

@Bean
public IntegrationFlow toTcp() {
    return f -> f.route(new TcpRouter());
}

}

and then the initiation of the holder:

@Component
public class TcpConnectionsController implements CommandLineRunner{

@Autowired
private TcpConnectionsHolder holder;

@Autowired
private ListNodeConfig listNodes;

@Value("${socket.tcp.headBytes}")
private int headBytes;

@Value("${socket.tcp.retryInterval}")
private int retryInterval;

@Override
public void run(String... args) throws Exception {
    
    for(Node node:listNodes.getNodes()) {
        holder.addReceiverChannel(node.getIp(), node.getPort(),headBytes,retryInterval);
        holder.addSendingChannel(node.getIp(), node.getPort(),headBytes);
    }
    
}

}

I know the problem is in fromTcp() but as much as I search, I am unable to see where the problem or error lies.

EDIT Complete stacktrace:

[INFO  ] (DefaultLifecycleProcessor.java:343) org.springframework.context.support.DefaultLifecycleProcessor : Starting beans in phase 2147483647
2018/06/22 13:56:47,454 [INFO  ] (DocumentationPluginsBootstrapper.java:151) springfox.documentation.spring.web.plugins.DocumentationPluginsBootstrapper : Context refreshed
2018/06/22 13:56:47,504 [INFO  ] (DocumentationPluginsBootstrapper.java:154) springfox.documentation.spring.web.plugins.DocumentationPluginsBootstrapper : Found 1 custom documentation plugin(s)
2018/06/22 13:56:47,555 [INFO  ] (ApiListingReferenceScanner.java:41) springfox.documentation.spring.web.scanners.ApiListingReferenceScanner : Scanning for api listing references
2018/06/22 13:56:47,826 [INFO  ] (DirectJDKLog.java:180) org.apache.coyote.http11.Http11NioProtocol : Initializing ProtocolHandler ["http-nio-8080"]
2018/06/22 13:56:47,827 [INFO  ] (DirectJDKLog.java:180) org.apache.coyote.http11.Http11NioProtocol : Starting ProtocolHandler ["http-nio-8080"]
2018/06/22 13:56:47,829 [INFO  ] (DirectJDKLog.java:180) org.apache.tomcat.util.net.NioSelectorPool : Using a shared selector for servlet write/read
2018/06/22 13:56:47,836 [INFO  ] (TomcatEmbeddedServletContainer.java:201) org.springframework.boot.context.embedded.tomcat.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8080 (http)
2018/06/22 13:56:47,902 [INFO  ] (AbstractConnectionFactory.java:481) org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory : started 180.112.19.1153115.inorg.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory#0, host=180.112.19.115, port=3115
2018/06/22 13:56:47,922 [INFO  ] (AbstractEndpoint.java:120) org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter : started org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter#0
2018/06/22 13:56:47,949 [INFO  ] (EventDrivenConsumer.java:108) org.springframework.integration.endpoint.EventDrivenConsumer : Adding {ip:tcp-outbound-channel-adapter} as a subscriber to the '180.112.19.1153115.out.input' channel
2018/06/22 13:56:47,950 [INFO  ] (AbstractSubscribableChannel.java:81) org.springframework.integration.channel.DirectChannel : Channel 'ck-da-bridge:local:8080.180.112.19.1153115.out.input' has 1 subscriber(s).
2018/06/22 13:56:47,950 [INFO  ] (AbstractConnectionFactory.java:481) org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory : started 180.112.19.1153115.inorg.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory#0, host=180.112.19.115, port=3115
2018/06/22 13:56:47,950 [INFO  ] (AbstractEndpoint.java:120) org.springframework.integration.endpoint.EventDrivenConsumer : started org.springframework.integration.config.ConsumerEndpointFactoryBean#1
2018/06/22 13:56:47,953 [INFO  ] (StartupInfoLogger.java:57) com.santander.ck.bridge.spring.Application : Started Application in 13.078 seconds (JVM running for 14.159)
2018/06/22 13:57:01,164 [ERROR ] (LoggingHandler.java:192) org.springframework.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'ck-da-bridge:local:8080.180.112.19.1153115.in.channel#0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=f0f8f0f0c2200000800000020400000000000000f0f5f9f9f1f0f3f0f6f2f2f1f3f5f6f5f5f0f0f0f0f1f2f0f6f0f2f2f0f2f0f0f0f9d4c3c3f0f1f1f1e3f0f0f8f2, headers={ip_tcp_remotePort=3115, ip_connectionId=amm000ammamm012059ammamm000ammamm012059ammamm000ammamm012059ammamm000ammamm000ammamm012059ammamm000ammamm012059ammamm000ammamm012059ammamm000ammamm000ammamm012059amm.XYZscisbXYZXYZscisbscisbXYZXYZscisbXYZXYZscisbscisbXYZXYZscisb.ABCABCABCABCABCisbanABCisbanABCisbanABCABCisban.corp:0000311500000000311531150000000031150000000031153115000000003115:53402:3722e84f-49ce-4e77-a4a1-6e82cefe58b4, ip_localInetAddress=/119218011921801192180119211921801192180119218011921192180.010100101101001010010110100101.213021302130221302130213022130.111911119119111191111911911119, ip_address=192180192180192180192192180192180192180192192180.011200112112001120011211200112.21921921922192192192219.111511115115111151111511511115, id=00002c6e2e0900002c6e2e0900002c6e2e09000000002c6e2e0900002c6e2e0900002c6e2e09000000002c6e2e09-9faa-a56e-3921-edf7a03a9a3c, ip_hostname=amm000ammamm012059ammamm000ammamm000ammamm012059ammamm012059ammamm000ammamm000ammamm012059ammamm000ammamm000ammamm012059ammamm012059ammamm000ammamm000ammamm012059amm.XYZscisbXYZscisbXYZscisbXYZXYZscisbXYZscisbXYZscisbXYZXYZscisb.ABCisbanABCABCisbanisbanABCABCisbanABCABCnABCABC.corp, timestamp=1529668621142}], failedMessage=GenericMessage [payload=f0f000022f0f6e3f0f0f8f2, headers={ip_tcp_remotePort=0000311500003115000031150000000031150000311500003115000000003115, ip_connectionId=amm000ammamm012059ammamm000ammamm000ammamm012059ammamm012059ammamm000ammamm000ammamm012059ammamm000ammamm000ammamm012059ammamm012059ammamm000ammamm000ammamm012059amm.scisb.XYZXYZXYZXYZXYZisbanXYZisbanXYZisbanXYZXYZisban.corp:0000311500000000311531150000000031150000000031153115000000003115:53402:3722e84f-49ce-4e77-a4a1-6e82cefe58b4, ip_localInetAddress=/192180192180192180192192180192180192180192192180.010100101101001010010110100101.213021302130221302130213022130.111911119119111191111911911119, ip_address=192180192180192180192192180192180192180192192180.011200112112001120011211200112.21921921922192192192219.111511115115111151111511511115, id=00002c6e2e0900002c6e2e0900002c6e2e09000000002c6e2e0900002c6e2e0900002c6e2e09000000002c6e2e09-9faa-a56e-3921-edf7a03a9a3c, ip_hostname=amm000ammamm012059ammamm000ammamm000ammamm012059ammamm012059ammamm000ammamm000ammamm012059ammamm000ammamm000ammamm012059ammamm012059ammamm000ammamm000ammamm012059amm.scisb..corp, timestamp=1529668621142}]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:93)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
        at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:188)
    at org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter.onMessage(TcpReceivingChannelAdapter.java:88)
    at org.springframework.integration.ip.tcp.connection.TcpNetConnection.run(TcpNetConnection.java:182)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=f0f8f0f0c2200000800000020400000000000000f0f5f9f9f1f0f3f0f6f2f2f1f3f5f6f5f5f0f0f0f0f1f2f0f6f0f2f2f0f2f0f0f0f9d4c3c3f0f1f1f1e3f0f0f8f2, headers={ip_tcp_remotePort=3115, ip_connectionId=amm000ammamm012059ammamm000ammamm012059ammamm000ammamm012059ammamm000ammamm000ammamm012059ammamm000ammamm012059ammamm000ammamm012059ammamm000ammamm000ammamm012059amm.XYZscisbXYZXYZscisbscisbXYZXYZscisbXYZXYZscisbscisbXYZXYZscisb.ABCABCABCABCABCisbanABCisbanABCisbanABCABCisban.corp:0000311500000000311531150000000031150000000031153115000000003115:53402:3722e84f-49ce-4e77-a4a1-6e82cefe58b4, ip_localInetAddress=/192180192180192180192192180192180192180192192180.010100101101001010010110100101.213021302130221302130213022130.111911119119111191111911911119, ip_address=192180192180192180192192180192180192180192192180.011200112112001120011211200112.21921921922192192192219.111511115115111151111511511115, id=00002c6e2e0900002c6e2e0900002c6e2e09000000002c6e2e0900002c6e2e0900002c6e2e09000000002c6e2e09-9faa-a56e-3921-edf7a03a9a3c, ip_hostname=amm000ammamm012059ammamm000ammamm000ammamm012059ammamm012059ammamm000ammamm000ammamm012059ammamm000ammamm000ammamm012059ammamm012059ammamm000ammamm000ammamm012059amm.XYZscisbXYZscisbXYZscisbXYZXYZscisbXYZscisbXYZscisbXYZXYZscisb.ABCisbanABCABCisbanisbanABCABCisbanABCABCABCABC.corp, timestamp=1529668621142}]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:154)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
    ... 11 more

Solution

  • You can turn on DEBUG logging level for the org.springframework.integration category and trace how message travels.

    Also there is a Message History pattern for you to track the message path in headers. This way you will be able to see what channels your messages has passed and where it is stuck with the Dispatcher has no subscribers.

    Although you would need to restart a MessageHistoryConfigurer after each IntegrationFlow registration.

    Feel free to raise a JIRA on the matter to let dynamic flows to be tracked by the Message History.

    UPDATE

    OK. Having your simple code like this:

    TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
    TcpNetClientConnectionFactory cf = getclientConnectionFactory(host,port,headBytes);
    adapter.setConnectionFactory(cf);
    adapter.setClientMode(true);
    adapter.setErrorChannelName("errorChannel");
    adapter.setRetryInterval(retryInterval);
    adapter.setOutputChannel(fromTcp());
    
    IntegrationFlow flow = IntegrationFlows.from(adapter).get();
    

    And the fact that there is no more components after this handler (adn stack trace really confirm that), I assume that you use some Spring Integration Java DSL version without the proper fix to extract an outputChannel from the provided MessageProducer.getOutputChannel().

    To fix that I suggest you to modify the code like this:

    IntegrationFlow flow = IntegrationFlows.from(adapter).channel(fromTcp()).get();
    

    I mean move the channel reference from the TcpReceivingChannelAdapter definition to the flow. Or just try upgrade to the latest 1.2.3 version for the spring-integration-java-dsl dependency!

    UPDATE 2

    Another thought about the code we have so far.

    You register a TcpReceivingChannelAdapter from the CommandLineRunner. This way the IntegrationFlow is started automatically and ready to receive data over TCP, but at the same time the @ServiceActivator(inputChannel = "fromTcp") is not started yet to consume messages from the mentioned channel. That's how you get that Dispatcher has no subscribers. Or you should mar your this.flowContext.registration(flow) with the autoStartup(false) and let it to be started lately during the normal auto-startup phase, already together with the service activator. Or you should not use CommandLineRunner approach for such a logic: it is really too early from there to start some activity.