Search code examples
spring-integration

TCP Caching Connection not working as expected


Could someone please clarify how the CachingClientConnectionFactory works?

Here’s what I expected:

When I create a CachingClientConnectionFactory with a pool of 100 TCP connections, I thought I could send a message to the pool, and any available TCP connection would handle sending it.

However, what’s happening now is that as soon as I send a message to the pool, the TCP connection sends it, but then it pauses for some reason. My understanding is after sending message to client, TCP connection is free, so that free connection is supposed to be sending message other remaining messages. Clearly, I don't see it happening.

For example, if I have a pool of 100 tcp connections and I try to send 101 messages, the TCP server will receive 100 of them, but the 101st message fails, resulting in an exception like this:

Exception in thread "main" org.springframework.messaging.MessageHandlingException: Failed to obtain a connection in the [bean 'org.springframework.integration.ip.tcp.TcpSendingMessageHandler#0' for component 'outAdapter.client'], failedMessage=GenericMessage [payload=Hello World 100, headers={id=ccdd6592-da82-cea6-d7ac-ea62a2b42776, timestamp=1724721768408}]
    at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.obtainConnection(TcpSendingMessageHandler.java:94)
    at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.doWrite(TcpSendingMessageHandler.java:180)
    at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageAsClient(TcpSendingMessageHandler.java:150)
    at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageInternal(TcpSendingMessageHandler.java:110)
    at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:378)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:332)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:302)
    at org.springframework.integration.samples.tcpclientserver.Main.main(Main.java:23)
Caused by: org.springframework.integration.util.PoolItemNotAvailableException: Timed out while waiting to acquire a pool entry.
    at org.springframework.integration.util.SimplePool.getItem(SimplePool.java:202)
    at org.springframework.integration.ip.tcp.connection.CachingClientConnectionFactory.obtainConnection(CachingClientConnectionFactory.java:144)
    at org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory.getConnection(AbstractClientConnectionFactory.java:121)
    at org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory.getConnection(AbstractClientConnectionFactory.java:42)
    at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.obtainConnection(TcpSendingMessageHandler.java:90)
    ... 13 more

Context:

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns:beans="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns="http://www.springframework.org/schema/integration"
    xmlns:ip="http://www.springframework.org/schema/integration/ip"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/integration/ip https://www.springframework.org/schema/integration/ip/spring-integration-ip.xsd
        http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd">

    <beans:description>
        Uses conversion service and collaborating channel adapters.
    </beans:description>

    <context:property-placeholder />

    <converter>
        <beans:bean class="org.springframework.integration.samples.tcpclientserver.ByteArrayToStringConverter" />
    </converter>

    <beans:bean id="fastestWireFormatSerializer" class="org.springframework.integration.ip.tcp.serializer.ByteArrayCrLfSerializer">
    </beans:bean>

    <beans:bean id="runMethod" class="org.springframework.integration.samples.tcpclientserver.Main">
    </beans:bean>

    <!-- Client side -->
    <ip:tcp-connection-factory id="clientConnFactory"
        type="client"
        host="localhost"
        port="8080"
        single-use="false"
        serializer="fastestWireFormatSerializer"
        deserializer="fastestWireFormatSerializer"
        so-timeout="10000" />

    <beans:bean id="cacheTcp" class="org.springframework.integration.ip.tcp.connection.CachingClientConnectionFactory">
        <beans:constructor-arg ref="clientConnFactory" />
        <beans:constructor-arg value="10" />
        <beans:property name="connectionWaitTimeout" value="1200"/>
    </beans:bean>

    <channel id="inbound" />
    <channel id="outbound" />

    <ip:tcp-outbound-channel-adapter id="outAdapter.client"
        channel="outbound"
        auto-startup="true"
        connection-factory="cacheTcp" />

    <!-- Async receive reply -->
    <ip:tcp-inbound-channel-adapter id="inAdapter.client"
        channel="inbound"
        auto-startup="true"
        connection-factory="cacheTcp" />

    <service-activator input-channel="inbound" ref="runMethod" method="run" />
</beans:beans>

Here is the main method:

package org.springframework.integration.samples.tcpclientserver;

import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.ip.config.TcpOutboundChannelAdapterParser;
import org.springframework.messaging.support.GenericMessage;

import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("Starting server...");
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("/META-INF/spring" +
                "/integration" +
                "/tcpClientServerDemo-conversion-context.xml");

        DirectChannel dc= context.getBean("outbound", DirectChannel.class);
        int i = 0;
        while (true) {
            dc.send(new GenericMessage<>("Hello World \r\n" + i++));
            Thread.sleep(10);
        }
    }

    public static void run(String resp) {
        System.out.println("REPLY RECEIVED --- "+resp);
    }
}

Here is the logs when i run the app:

Starting server...
REPLY RECEIVED --- Hello World 
REPLY RECEIVED --- 0 echo
REPLY RECEIVED --- Hello World 
REPLY RECEIVED --- 1 echo
REPLY RECEIVED --- Hello World 
REPLY RECEIVED --- 2 echo
REPLY RECEIVED --- Hello World 
REPLY RECEIVED --- 3 echo
REPLY RECEIVED --- Hello World 
REPLY RECEIVED --- 4 echo
REPLY RECEIVED --- Hello World 
REPLY RECEIVED --- 5 echo
REPLY RECEIVED --- Hello World 
REPLY RECEIVED --- 6 echo
REPLY RECEIVED --- Hello World 
REPLY RECEIVED --- 7 echo
REPLY RECEIVED --- Hello World 
REPLY RECEIVED --- 8 echo
REPLY RECEIVED --- Hello World 
REPLY RECEIVED --- 9 echo
20:55:19.271 ERROR [main][org.springframework.integration.ip.tcp.TcpSendingMessageHandler] Error creating connection
org.springframework.integration.util.PoolItemNotAvailableException: Timed out while waiting to acquire a pool entry.
    at org.springframework.integration.util.SimplePool.getItem(SimplePool.java:202) ~[spring-integration-core-6.2.0.jar:6.2.0]
    at org.springframework.integration.ip.tcp.connection.CachingClientConnectionFactory.obtainConnection(CachingClientConnectionFactory.java:144) ~[spring-integration-ip-6.2.0.jar:6.2.0]
    at org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory.getConnection(AbstractClientConnectionFactory.java:121) ~[spring-integration-ip-6.2.0.jar:6.2.0]
    at org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory.getConnection(AbstractClientConnectionFactory.java:42) ~[spring-integration-ip-6.2.0.jar:6.2.0]
    at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.obtainConnection(TcpSendingMessageHandler.java:90) [spring-integration-ip-6.2.0.jar:6.2.0]
    at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.doWrite(TcpSendingMessageHandler.java:180) [spring-integration-ip-6.2.0.jar:6.2.0]
    at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageAsClient(TcpSendingMessageHandler.java:150) [spring-integration-ip-6.2.0.jar:6.2.0]
    at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageInternal(TcpSendingMessageHandler.java:110) [spring-integration-ip-6.2.0.jar:6.2.0]
    at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105) [spring-integration-core-6.2.0.jar:6.2.0]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73) [spring-integration-core-6.2.0.jar:6.2.0]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132) [spring-integration-core-6.2.0.jar:6.2.0]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) [spring-integration-core-6.2.0.jar:6.2.0]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) [spring-integration-core-6.2.0.jar:6.2.0]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) [spring-integration-core-6.2.0.jar:6.2.0]
    at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:378) [spring-integration-core-6.2.0.jar:6.2.0]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:332) [spring-integration-core-6.2.0.jar:6.2.0]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:302) [spring-integration-core-6.2.0.jar:6.2.0]
    at org.springframework.integration.samples.tcpclientserver.Main.main(Main.java:23) [classes/:?]
Exception in thread "main" org.springframework.messaging.MessageHandlingException: Failed to obtain a connection in the [bean 'org.springframework.integration.ip.tcp.TcpSendingMessageHandler#0' for component 'outAdapter.client'], failedMessage=GenericMessage [payload=Hello World 
10, headers={id=d503b07d-de6e-515b-799f-f3445e7776b2, timestamp=1724892918058}]
    at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.obtainConnection(TcpSendingMessageHandler.java:94)
    at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.doWrite(TcpSendingMessageHandler.java:180)
    at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageAsClient(TcpSendingMessageHandler.java:150)
    at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageInternal(TcpSendingMessageHandler.java:110)
    at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:378)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:332)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:302)
    at org.springframework.integration.samples.tcpclientserver.Main.main(Main.java:23)
Caused by: org.springframework.integration.util.PoolItemNotAvailableException: Timed out while waiting to acquire a pool entry.
    at org.springframework.integration.util.SimplePool.getItem(SimplePool.java:202)
    at org.springframework.integration.ip.tcp.connection.CachingClientConnectionFactory.obtainConnection(CachingClientConnectionFactory.java:144)
    at org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory.getConnection(AbstractClientConnectionFactory.java:121)
    at org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory.getConnection(AbstractClientConnectionFactory.java:42)
    at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.obtainConnection(TcpSendingMessageHandler.java:90)
    ... 13 more


Here is the server side tcp implementation:

const net = require("net")

// Create a TCP server
const server = net.createServer((socket) => {
    console.log("Client connected")

    // When data is received from the client
    socket.on("data", (data) => {
        const receivedMessage = data.toString().trim()
        console.log(`Received: ${receivedMessage}`)

        const responseMessage = `${receivedMessage} echo`
        socket.write(responseMessage + "\r\n")
    })

    // When the client disconnects
    socket.on("end", () => {
        console.log("Client disconnected")
    })

    // Handle errors
    socket.on("error", (err) => {
        console.error(`Error: ${err.message}`)
    })
})

// Start the server on port 8080
server.listen(8080, () => {
    console.log("Server listening on port 8080")
})

To run node server.js

Here is the log of server:

q@qs-MacBook-Pro delete_iiit % node server.js
Server listening on port 8080
Client connected
Received: Hello World 
0
Client connected
Received: Hello World 
1
Client connected
Received: Hello World 
2
Client connected
Received: Hello World 
3
Client connected
Received: Hello World 
4
Client connected
Received: Hello World 
5
Client connected
Received: Hello World 
6
Client connected
Received: Hello World 
7
Client connected
Received: Hello World 
8
Client connected
Received: Hello World 
9
Client disconnected
Client connected
Received: Hello World 
10
Client disconnected
Client connected
Received: Hello World 
11
Client disconnected
Client connected
Received: Hello World 
12
Client disconnected
Client connected
Received: Hello World 
13
Client disconnected
Client connected
Received: Hello World 
14
Client disconnected
Client connected
Received: Hello World 
15
Client disconnected
Client connected
Received: Hello World 
16
Client disconnected
Client connected
Received: Hello World 
17
Client disconnected
Client connected
Received: Hello World 
18
Client disconnected
Client connected
Received: Hello World 
19
Client disconnected
Client connected
Received: Hello World 
20
Client disconnected
Client connected
Received: Hello World 
21
Client disconnected
Client disconnected
Client disconnected
Client disconnected
Client disconnected
Client disconnected
Client disconnected
Client disconnected
Client disconnected
Client disconnected
Client connected
Received: Hello World 
0
Client connected
Received: Hello World 
1
Client connected
Received: Hello World 
2
Client connected
Received: Hello World 
3
Client connected
Received: Hello World 
4
Client connected
Received: Hello World 
5
Client connected
Received: Hello World 
6
Client connected
Received: Hello World 
7
Client connected
Received: Hello World 
8
Client connected
Received: Hello World 
9
Client disconnected
Client disconnected
Client disconnected
Client disconnected
Client disconnected
Client disconnected
Client disconnected
Client disconnected
Client disconnected
Client disconnected


Solution

  • The logic there is like this:

        finally {
            if (connection != null && this.isSingleUse
                    && this.clientConnectionFactory.getListener() == null) {
                // if there's no collaborating inbound adapter, close immediately, otherwise
                // it will close after receiving the reply.
                connection.close();
            }
        }
    

    And since you use that <ip:tcp-inbound-channel-adapter>, that means that reply is expected after sending. Looks like a reply does not come from the server. Therefore no connection closing and no returning to the cache.

    The caching one does set that flag to true:

    public CachingClientConnectionFactory(AbstractClientConnectionFactory target, int poolSize) {
        super("", 0);
        // override single-use to true so the target creates multiple connections
        target.setSingleUse(true);
        this.targetConnectionFactory = target;
        this.pool = new SimplePool<>(poolSize, new TcpConnectionPoolItemCallback(this.targetConnectionFactory));
    }
    

    But you still have to receive a reply for that connection to have it closed properly.

    UPDATE

    So, we have a delegation problem with a CachingClientConnectionFactory, where it does check against its own props, while has to look into a target one.

    Will fix soon.

    Meanwhile I don't see any workaround unless don't use caching at all.