Search code examples
javaspringsocketstcpspring-integration

Spring Integration and TCP server socket - how can I send a message to a client?


I'm trying to create a server in Spring that's listening on a TCP port and accepts connections. I know how to route incoming requests to my service, and it can respond to those. However I would like to send messages to certain clients without any request received. For example, sometimes I have to inform a client about that it has got a message.

To do this, I think I need a way to identify the clients, e.g. by letting them log in. Is there a way to have a "session" object for each active connection in which I can store login data?

How could I send a message to a client which has logged in with username X?

Is this possible in Spring at all?


Solution

  • Starting with version 3.0; the frameworks now emits connection events when there are connection state changes. You can capture these events using an ApplicationListener, or using an <event:inbound-channel-adapter/>.

    The TcpConnectionOpenEvent contains a connectionId; you can send arbitrary messages to any connection once you know its id, by populating the IpHeaders.connectionId header (ip_connectionId) in a message and sending it to a <tcp:outbound-channel-adapter/>.

    If you need to support request/reply as well as sending arbitrary messages, you need to use a collaborating pair of channel adapters for all communication, not a gateway.

    EDIT

    Here's a simple Boot app...

    package com.example;
    
    import java.io.BufferedReader;
    import java.io.InputStreamReader;
    import java.net.Socket;
    
    import javax.net.SocketFactory;
    
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.boot.builder.SpringApplicationBuilder;
    import org.springframework.context.ApplicationListener;
    import org.springframework.context.ConfigurableApplicationContext;
    import org.springframework.context.annotation.Bean;
    import org.springframework.integration.channel.DirectChannel;
    import org.springframework.integration.channel.QueueChannel;
    import org.springframework.integration.dsl.IntegrationFlow;
    import org.springframework.integration.dsl.IntegrationFlows;
    import org.springframework.integration.ip.IpHeaders;
    import org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter;
    import org.springframework.integration.ip.tcp.TcpSendingMessageHandler;
    import org.springframework.integration.ip.tcp.connection.TcpConnectionOpenEvent;
    import org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory;
    import org.springframework.integration.ip.tcp.connection.TcpServerConnectionFactory;
    import org.springframework.integration.support.MessageBuilder;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.MessageHandler;
    
    @SpringBootApplication
    public class So25102101Application {
    
        public static void main(String[] args) throws Exception {
            ConfigurableApplicationContext context = new SpringApplicationBuilder(So25102101Application.class)
                    .web(false)
                    .run(args);
            int port = context.getBean(TcpServerConnectionFactory.class).getPort();
            Socket socket = SocketFactory.getDefault().createSocket("localhost", port);
            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            String line = reader.readLine();
            System.out.println(line);
            context.close();
        }
    
        @Bean
        public TcpReceivingChannelAdapter server(TcpNetServerConnectionFactory cf) {
            TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
            adapter.setConnectionFactory(cf);
            adapter.setOutputChannel(inputChannel());
            return adapter;
        }
    
        @Bean
        public MessageChannel inputChannel() {
            return new QueueChannel();
        }
    
        @Bean
        public MessageChannel outputChannel() {
            return new DirectChannel();
        }
    
        @Bean
        public TcpNetServerConnectionFactory cf() {
            return new TcpNetServerConnectionFactory(0);
        }
    
        @Bean
        public IntegrationFlow outbound() {
            return IntegrationFlows.from(outputChannel())
                    .handle(sender())
                    .get();
        }
    
        @Bean
        public MessageHandler sender() {
            TcpSendingMessageHandler tcpSendingMessageHandler = new TcpSendingMessageHandler();
            tcpSendingMessageHandler.setConnectionFactory(cf());
            return tcpSendingMessageHandler;
        }
    
        @Bean
        public ApplicationListener<TcpConnectionOpenEvent> listener() {
            return new ApplicationListener<TcpConnectionOpenEvent>() {
    
                @Override
                public void onApplicationEvent(TcpConnectionOpenEvent event) {
                    outputChannel().send(MessageBuilder.withPayload("foo")
                            .setHeader(IpHeaders.CONNECTION_ID, event.getConnectionId())
                            .build());
                }
    
            };
        }
    
    }
    

    pom deps:

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
    
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-ip</artifactId>
        </dependency>
    
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>