Search code examples
springsocketsspring-integrationtcplistener

Unexpected message - no endpoint registered with connection interceptor while communicating with multiple servers


    @Configuration
@Component
public class GatewayAqrConfig {

    @Autowired
    ConnectorService connectorService;

    @Autowired
    MasterService masterService;

    private HashMap<ConnectorPK, GatewayAqr> connectorMap;

    @Bean
    @Scope(value = "prototype")
    public AbstractClientConnectionFactory clientCF(Connector connector , Master master) {
        TcpNetClientConnectionFactory clientConnectionFactory = new TcpNetClientConnectionFactory(connector.getAqrIpAddr(), connector.getAqrIpPortNo());
        clientConnectionFactory.setSingleUse(false);        
        MyByteArraySerializer obj = new MyByteArraySerializer(master.getAqrMsgHeaderLength(), master.getAqrId());
        clientConnectionFactory.setSerializer(obj);
        clientConnectionFactory.setDeserializer(obj);       
        clientConnectionFactory.setSoKeepAlive(true);
        TcpMessageMapper tcpMessageMapper = new TcpMessageMapper();
        tcpMessageMapper.setCharset("ISO-8859-1");
        clientConnectionFactory.setMapper(tcpMessageMapper);        
        clientConnectionFactory.setBeanName(connector.getAqrIpAddr() + ":" + connector.getAqrIpPortNo());               
        clientConnectionFactory.afterPropertiesSet();
        clientConnectionFactory.start();        
        return clientConnectionFactory;
    }

    @Bean
    @Scope(value = "prototype")
    public TcpSendingMessageHandler tcpOutGateway(AbstractClientConnectionFactory connectionFactory) {      
        TcpSendingMessageHandler messageHandler = new TcpSendingMessageHandler();
        messageHandler.setConnectionFactory(connectionFactory);     
        messageHandler.setClientMode(true);
        messageHandler.setTaskScheduler(getTaskScheduler());
        messageHandler.setStatsEnabled(true);
        messageHandler.afterPropertiesSet();
        messageHandler.start();     
        return messageHandler;
    }

    @Bean
    @Scope(value = "prototype")
    public TcpReceivingChannelAdapter tcpInGateway(AbstractClientConnectionFactory connectionFactory) {
        TcpReceivingChannelAdapter messageHandler = new TcpReceivingChannelAdapter();
        messageHandler.setConnectionFactory(connectionFactory);             
        messageHandler.setClientMode(true);
        messageHandler.setOutputChannel(receive());
        messageHandler.setAutoStartup(true);
        messageHandler.setTaskScheduler(getTaskScheduler());
        messageHandler.afterPropertiesSet();
        messageHandler.start();
        return messageHandler;
    }

    @Bean
    @Scope(value = "prototype")
    public TaskScheduler getTaskScheduler() {
        TaskScheduler ts = new ThreadPoolTaskScheduler();
        return ts;
    }

    @Bean
    public MessageChannel receive() {
        QueueChannel channel = new QueueChannel();      
        return channel;
    }

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata poller() { 
        return new PollerMetadata();
    }

    @Bean
    @Transactional
    public HashMap<ConnectorPK, GatewayAqr> gatewayAqr() throws Exception {
        connectorMap = new HashMap();
        Connector connector = null;
        ConnectorPK connectorPK = null;
        Master master = null;
        TcpConnectionSupport connectionSupport = null;

        // 1. Get List of Connections configured in Database
        List<Connector> connectors = connectorService.getConnections();

        if (connectors.size() > 0) {
            for (int i = 0; i < connectors.size(); i++) {

                // 2. Get the connection details
                connector = connectors.get(i);
                connectorPK = aqrConnector.getConnectorpk();
                master = masterService.findById(connectorPK.getAcuirerId());                

                try {
                    // 3. Create object of TcpNetClientConnectionFactory for each Acquirer connection
                    AbstractClientConnectionFactory clientConnectionFactory = clientCF(aqrConnector, aqrMaster);                    

                    // 4. Create TcpSendingMessageHandler for the Connection
                    TcpSendingMessageHandler outHandler = tcpOutGateway(clientConnectionFactory);                   

                    // 5. Create TcpReceivingChannelAdapter object for the Connection and assign it to receive channel
                    TcpReceivingChannelAdapter inHandler = tcpInGateway(clientConnectionFactory);

                    // 6. Generate the GatewayAqr object
                    GatewayAqr gatewayAqr = new GatewayAqr(clientConnectionFactory, outHandler, inHandler);

                    // 7. Put in the MAP acuirerPK and Send MessageHandler object
                    connectorMap.put(aqrConnectorPK, gatewayAquirer);                   
                } catch (Exception e) {
                }

            } // for
        } // if
        return connectorMap;
    }
}

*********************************************************************************************************************************
@EnableIntegration
@IntegrationComponentScan(basePackageClasses = {GatewayEventConfig.class,GatewayAqrConfig.class })
@Configuration
@ComponentScan(basePackages = {"com.iz.zw.gateway.impl", "com.iz.zw.configuration"})
@Import({GatewayEventConfig.class,GatewayAquirerConfig.class})
public class GatewayConfig {

    @Autowired
    private GatewayAsyncReply<Object, Message<?>> gatewayAsyncReply;

    @Autowired
    private GatewayCorrelationStrategy gatewayCorrelationStrategy;

    @Autowired
    private HashMap<ConnectorPK, GatewayAqr> gatewayAqrs;

    @Autowired
    ConnectorService connectorService;

    @Autowired
    GatewayResponseDeserializer gatewayResponseDeserializer;    

    @MessagingGateway(defaultRequestChannel = "send")
    public interface Gateway {
        void waitForResponse(TransactionMessage transaction);
    }

    @Bean
    public MessageChannel send() {
        DirectChannel channel = new DirectChannel();        
        return channel;
    }

    @Bean
    @ServiceActivator(inputChannel = "send")
    public BarrierMessageHandlerWithLateGoodResponse barrier() {
        BarrierMessageHandlerWithLateGoodResponse barrier = new BarrierMessageHandlerWithLateGoodResponse(25000, this.gatewayCorrelationStrategy);      
        barrier.setAsync(true);
        barrier.setOutputChannel(out());
        barrier.setDiscardChannel(lateGoodresponseChannel());
        return barrier;
    }

    @ServiceActivator(inputChannel = "out")
    public void printMessage(Message<?> message) {
        System.out.println("in out channel");
    }

    @Transformer(inputChannel = "receive", outputChannel = "process")
    public TransactionMessage convert(byte[] response) {    

        logger.debug("Response Received", Arrays.toString(response));
        TransactionMessage transactionMessage = gatewayResponseDeserializer.deserializeResponse(response);      
        System.out.println("Response : " + response);       
        return transactionMessage;
    }

    @ServiceActivator(inputChannel = "process")
    @Bean
    public MessageHandler releaser() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                try {               
                    gatewayAsyncReply.put(message);             
                    barrier().trigger(message);
                } catch (GatewayLateGoodMessageException exception) {                   
                    System.out.println("Late good response..!");
                    gatewayAsyncReply.get(message);
                    lateGoodresponseChannel().send(message);
                }                               
            }
        };
    }   

    @Bean
    public MessageChannel process() {   
        QueueChannel channel = new QueueChannel();
        return channel;
    }

    @Bean
    public MessageChannel out() {
        DirectChannel channel = new DirectChannel();        
        return channel;
    }

    @Bean
    public MessageChannel lateGoodresponseChannel() {
        QueueChannel channel = new QueueChannel();
        return channel;
    }

    @ServiceActivator(inputChannel="lateGoodresponseChannel")
    public void handleLateGoodResponse(Message<?> message) {
        String strSTAN = null;
        String strResponse = null;
        Message<?> respMessage = null;

        if(message instanceof TransactionMessage){
            strSTAN = ((TransactionMessage)message).getStan();
            respMessage = gatewayAsyncReply.get(strSTAN);

            if (null != respMessage) {              
                strResponse = (String) message.getPayload();                
            }
        }               
        logger.info("Late Good Response: " + strResponse);      
    }
}

*********************************************************************************************************************************

@Configuration
public class GatewayEventConfig {

    private static final Logger logger = LoggerFactory.getLogger(GatewayEventConfig.class);

    @Bean
    public ApplicationEventListeningMessageProducer tcpEventListener() {
        ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer();
        producer.setEventTypes(new Class[] {TcpConnectionOpenEvent.class, TcpConnectionCloseEvent.class, TcpConnectionExceptionEvent.class});
        producer.setOutputChannel(tcpEventChannel());
        producer.setAutoStartup(true);
        producer.setTaskScheduler(getEventTaskScheduler());
        producer.start();
        return producer;
    }

    @Bean
    public TaskScheduler getEventTaskScheduler() {
        TaskScheduler ts = new ThreadPoolTaskScheduler();
        return ts;
    }

    @Bean
    public MessageChannel tcpEventChannel() {
        return new QueueChannel();
    }

    @Transactional
    @ServiceActivator(inputChannel = "tcpEventChannel")
    public void tcpConnectionEvent(TcpConnectionEvent event) {

        System.out.println("In publishing" + event.toString());
        String strConnectionFactory = event.getConnectionFactoryName();

        if (strConnectionFactory.equals("connection1")) {            
                //send some message to connector            
        } else {
            // send message to another connector
        }
    }
}

this is my configuration files, my application tries to connect to 2 servers as soon as it starts. I have made 2 configurations for 2 servers as above class GatewayAqrConfig1 and GatewayConfig1 classes are used for first server connection GatewayAqrConfig2 and GatewayConfig2 classes are used for second server connection Using event I am connecting to server and sending a connection set up message, if server is already started and If I have started my application, it gets the event, connects and sends the message but I am not getting the response instead I am getting the WARNING as below

**WARN  TcpNetConnection:186 - Unexpected message - no endpoint registered with connection interceptor:**
    i.e connection does not registers the listener properly

but if I am starting my application first and then servers I am getting responses perfectly, As I am connecting to servers I could not restart it ? My application should connect to server which is already started ? what could be the problem ?

Version used:

Spring integration Version : 4.3.1

Spring version : 4.3.2

JDK 1.8 on JBOSS EAP 7


Solution

  • That WARN message means that, somehow, an inbound message was received without a TcpReceivingChannelAdapter having been registered with the connection factory. Client mode should make no difference.

    Having looked at your code a little more, the prototype beans should be ok, as long as you use those objects (especially TcpMessageHandler directly rather than via the framework).

    It's not obvious to me how that can happen, given your configuration; the listener is registered when you call setConnectionFactory on the receiving adapter.

    If you can reproduce it with a trimmed-down project and post it someplace, I will take a look.