Search code examples
spring-integrationspring-integration-dsl

How to config tcp server to receive data from multiple client using spring boot?


I would like to configure TCP server to receive and reply data from multiple clients. I searched many other thread but could not found exact way to do. I'm using spring integration first time and have no experience.

Server requirement

  1. should be able to receive and reply data to specific client (can have multiple client, each client should processed separately)
  2. should be able to send data to client and wait for response for specific timeout.
  3. Should be able to detect client is disconnect or not. if Client is disconnect then connection should be closed to save memory. (In earlier method without spring integration I was able to do it by ping client and see sending is failed or not but don't know how to do with spring integration)

I tried below code, In which I'm able to send data to client but could achieve my above requirements

TCP Server Configuration:

@Configuration
public class TcpServerConfig {

    private List<TcpConnectionOpenEvent> clientList = new ArrayList<>();

    public List<TcpConnectionOpenEvent> getClientList() {
        return clientList;
    }

    @Bean
    public TcpReceivingChannelAdapter server(TcpNetServerConnectionFactory cf) {
        TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
        adapter.setConnectionFactory(cf);
        adapter.setOutputChannel(inputChannel());
        return adapter;
    }

    @Bean
    public MessageChannel inputChannel() {
        return new QueueChannel();
    }

    @Bean
    public MessageChannel outputChannel() {
        return new DirectChannel();
    }

    @Bean
    public TcpNetServerConnectionFactory cf() {
        return new TcpNetServerConnectionFactory(1001);
    }

    @Bean
    public IntegrationFlow outbound() {
        return IntegrationFlows.from(outputChannel())
                .handle(sender())
                .get();
    }

    @Bean
    public MessageHandler sender() {
        TcpSendingMessageHandler tcpSendingMessageHandler = new TcpSendingMessageHandler();
        tcpSendingMessageHandler.setConnectionFactory(cf());
        return tcpSendingMessageHandler;
    }

    @Bean
    public ApplicationListener<TcpConnectionOpenEvent> listener() {
        return new ApplicationListener<TcpConnectionOpenEvent>() {

            @Override
            public void onApplicationEvent(TcpConnectionOpenEvent event) {
                outputChannel().send(MessageBuilder.withPayload("foo")
                        .setHeader(IpHeaders.CONNECTION_ID, event.getConnectionId())
                        .build());

                clientList.add(event);
            }
        };
    }
}

Test Code:

@Service
public class Test {

    private static final Logger LOGGER = LoggerFactory.getLogger(MessageServiceImpl.class);

    @Autowired
    TcpServerConfig tcpServerConfig;

    @Autowired
    private MessageChannel outputChannel;

    @Autowired
    private MessageChannel inputChannel;

    @Scheduled(fixedRate = 1000)
    void task() {
            LOGGER.info("Client count: " + tcpServerConfig.getClientList().size());

            for (TcpConnectionOpenEvent client : tcpServerConfig.getClientList()) {
                outputChannel.send(MessageBuilder.withPayload("foo")
                        .setHeader(IpHeaders.CONNECTION_ID, client.getConnectionId())
                        .build());
            }
        }
}

Any help would be appreciated.


Solution

  • Here is one solution:

    @SpringBootApplication
    @EnableScheduling
    public class So62877512ServerApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(So62877512ServerApplication.class, args);
        }
    
        @Bean
        public IntegrationFlow serverIn(Handler handler) {
            return IntegrationFlows.from(Tcp.inboundAdapter(server()))
                    .transform(Transformers.objectToString())
                    .filter(handler, "existingConnection", spec -> spec
                            .discardFlow(f -> f
                                    .handle(handler, "sendInitialReply")))
                    .handle(handler, "reply")
                    .get();
        }
    
        @Bean
        public IntegrationFlow serverOut() {
            return f -> f.handle(Tcp.outboundAdapter(server()));
        }
    
        @Bean
        public TcpServerConnectionFactorySpec server() {
            return Tcp.netServer(1234)
                    .serializer(TcpCodecs.lf())
                    .deserializer(TcpCodecs.lf()); // compatible with netcat
        }
    
    }
    
    @Component
    @DependsOn("serverOut")
    class Handler {
    
        private static final Logger LOG = LoggerFactory.getLogger(Handler.class);
    
        private final ConcurrentMap<String, BlockingQueue<Message<?>>> clients = new ConcurrentHashMap<>();
    
        private final MessageChannel out;
    
        private final TcpNetServerConnectionFactory server;
    
        public Handler(@Qualifier("serverOut.input") MessageChannel out, TcpNetServerConnectionFactory server) {
            this.out = out;
            this.server = server;
        }
    
        public boolean existingConnection(Message<?> message) {
            String connectionId = message.getHeaders().get(IpHeaders.CONNECTION_ID, String.class);
            boolean containsKey = this.clients.containsKey(connectionId);
            if (!containsKey) {
                this.clients.put(connectionId, new LinkedBlockingQueue<Message<?>>());
            }
            return containsKey;
        }
    
        public void sendInitialReply(Message<String> message) {
            LOG.info("Replying to " + message.getPayload());
            this.out.send(MessageBuilder.withPayload(message.getPayload().toUpperCase())
                    .copyHeaders(message.getHeaders()).build());
        }
    
        @Scheduled(fixedDelay = 5000)
        public void sender() {
            this.clients.forEach((key, queue) -> {
                try {
                    this.out.send(MessageBuilder.withPayload("foo")
                            .setHeader(IpHeaders.CONNECTION_ID, key).build());
                    Message<?> reply = queue.poll(10, TimeUnit.SECONDS);
                    if (reply == null) {
                        LOG.error("Timeout waiting for " + key);
                        this.server.closeConnection(key);
                    }
                    else {
                        LOG.info("Reply " + reply.getPayload() + " from " + key);
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    LOG.error("Interrupted");
                }
                catch (Exception e) {
                    LOG.error("Failed to send to " + key, e);
                }
            });
        }
    
        public void reply(Message<String> in) {
            BlockingQueue<Message<?>> queue = this.clients.get(in.getHeaders().get(IpHeaders.CONNECTION_ID, String.class));
            if (queue != null) {
                queue.add(in);
            }
        }
    
        @EventListener
        public void closed(TcpConnectionCloseEvent event) {
            this.clients.remove(event.getConnectionId());
            LOG.info(event.getConnectionId() + " closed");
        }
    
    }
    
    $ nc localhost 1234
    foo <- typed
    FOO
    foo
    bar <- typed
    foo
    bar <- typed
    foo
    
    $ <- closed by server - timeout
    
    2020-07-14 14:41:04.906  INFO 64763 --- [pool-1-thread-2] com.example.demo.Handler                 : Replying to foo
    2020-07-14 14:41:13.841  INFO 64763 --- [   scheduling-1] com.example.demo.Handler                 : Reply bar from localhost:65115:1234:a9fc7e3d-4dda-4627-b765-4f0bb0835153
    2020-07-14 14:41:21.465  INFO 64763 --- [   scheduling-1] com.example.demo.Handler                 : Reply bar from localhost:65115:1234:a9fc7e3d-4dda-4627-b765-4f0bb0835153
    2020-07-14 14:41:36.473 ERROR 64763 --- [   scheduling-1] com.example.demo.Handler                 : Timeout waiting for localhost:65115:1234:a9fc7e3d-4dda-4627-b765-4f0bb0835153
    2020-07-14 14:41:36.474  INFO 64763 --- [   scheduling-1] com.example.demo.Handler                 : localhost:65115:1234:a9fc7e3d-4dda-4627-b765-4f0bb0835153 closed