Search code examples
spring-integrationspring-integration-dsl

Spring Integration TCP Inbound Adapter client disconnect issue


We have created an IntegrationFlow for an inbound TCP server. Whenever we test this the integration is fine until the client disconnects, then we get a continuous flow of empty messages.

Spring Integration 5.2.5.RELEASE

Flow:

@Bean
public IntegrationFlow inboundFlow(MyService myService) {
   return IntegrationFlows.from(
       Tcp.inboundAdapter(
           Tcp.netServer(12345)
               .deserializer(TcpCodecs.lengthHeader4())))
       .transform(Transformers.objectToString())
       .log(INFO) // Logging only for testing 
       .handle(myService, "process") // Do some processing of the message
       .get();
}

Test method:

public void sendPackets() throws Exception {
    List<Path> files = getAllFilesSorted("/some/location");
    try (Socket socket = new Socket("localhost", 12345)) {
        log.info("local port: {}", socket.getLocalPort());
        try (OutputStream outputStream = socket.getOutputStream()) {
            for (int i = 0; i < 10; i++) {
                Path path = files.get(i);
                log.info("{} ({} of {})", path.toString(), i, files.size());
                byte[] bytes = Files.readAllBytes(path);
                outputStream.write(bytes);
                outputStream.flush();
                Thread.sleep(200);
            }
        }
    }
}

Partial Log after test completes:

2020-05-05 11:00:52.963  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=813ccf6b-9eb7-2ff3-0f8d-f2810966d8d7, ip_hostname=localhost, timestamp=1588672852963}]
2020-05-05 11:00:52.963  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=1fd5c4c7-b2e0-6a89-7de6-894c8e42e242, ip_hostname=localhost, timestamp=1588672852963}]
2020-05-05 11:00:52.963  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=78b852e8-0094-f8b9-8d05-37bc4912d096, ip_hostname=localhost, timestamp=1588672852963}]
2020-05-05 11:00:52.963  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=4e4a7ee0-c66e-9501-9ef7-4d25143531b3, ip_hostname=localhost, timestamp=1588672852963}]
2020-05-05 11:00:52.963  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=c90abe17-7036-94cc-af59-3b0cac5e75e4, ip_hostname=localhost, timestamp=1588672852963}]
2020-05-05 11:00:52.963  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=425d2e2a-75c5-0184-06da-6d27eb546931, ip_hostname=localhost, timestamp=1588672852963}]
2020-05-05 11:00:52.965  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=4c55d1d0-7421-1832-23ca-03744419c847, ip_hostname=localhost, timestamp=1588672852965}]
2020-05-05 11:00:52.965  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=05b6fea6-8f76-5c74-e4cc-33bd8099d8e8, ip_hostname=localhost, timestamp=1588672852965}]
2020-05-05 11:00:52.965  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=9b8fe16c-07f4-b64b-faec-794eb743559c, ip_hostname=localhost, timestamp=1588672852965}]
2020-05-05 11:00:52.965  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=222725e8-19c6-62b6-8ba7-103892191c96, ip_hostname=localhost, timestamp=1588672852965}]
2020-05-05 11:00:52.965  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=951403b6-18a9-ffe9-29b7-67b8c01ea311, ip_hostname=localhost, timestamp=1588672852965}]
2020-05-05 11:00:52.965  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=2ac3002b-ccd1-9506-f3aa-60f4b63e26b8, ip_hostname=localhost, timestamp=1588672852965}]
2020-05-05 11:00:52.965  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=dbbf12cf-0a27-407b-0a96-5b27ab0539db, ip_hostname=localhost, timestamp=1588672852965}]
2020-05-05 11:00:52.965  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=3282d09e-9508-b053-3763-cc23d3c817a8, ip_hostname=localhost, timestamp=1588672852965}]
2020-05-05 11:00:52.966  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=549b2326-341f-19c0-db98-6b31da1901d8, ip_hostname=localhost, timestamp=1588672852966}]

Solution

  • There is nothing in the log about the client disconnecting. We would see socket close activity in the log.

    The log implies we are receiving packets containing 0x00000000.

    Use wireshark or similar to look at the activity on the wire.

    EDIT

    Your custom deserializer is causing the problem.

            if (read < 0) {
                return 0;
            }
    

    You need to throw a SoftEndOfStreamException when you detect a socket close.

    This is explained in the documentation.

    When the deserializer detects a closed input stream between messages, it must throw a SoftEndOfStreamException; this is a signal to the framework to indicate that the close was "normal". If the stream is closed while decoding a message, some other exception should be thrown instead.