Search code examples
javaspring-integration-dslspring-integration-ip

How to implement simple echo socket service in Spring Integration DSL


Please,
could you help with implementation of a simple, echo style, Heartbeat TCP socket service in Spring Integration DSL? More precisely how to plug Adapter/Handler/Gateway to IntegrationFlows on the client and server side. Practical examples are hard to come by for Spring Integration DSL and TCP/IP client/server communication.

I think, I nailed most of the code, it's just that bit about plugging everything together in the IntegrationFlow.

There is an sample echo service in SI examples, but it's written in the "old" XML configuration and I really struggle to transform it to the configuration by code.

My Heartbeat service is a simple server waiting for client to ask "status", responding with "OK".

No @ServiceActivator, no @MessageGateways, no proxying, everything explicit and verbose; driven by a plain JDK scheduled executor on client side; server and client in separate configs and projects.

HeartbeatClientConfig

@Configuration
@EnableIntegration
public class HeartbeatClientConfig {

    @Bean
    public MessageChannel outboudChannel() {
        return new DirectChannel();
    }

    @Bean
    public PollableChannel inboundChannel() {
        return new QueueChannel();
    }

    @Bean
    public TcpNetClientConnectionFactory connectionFactory() {
        TcpNetClientConnectionFactory connectionFactory = new TcpNetClientConnectionFactory("localhost", 7777);
        return connectionFactory;
    }

    @Bean
    public TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter(
            TcpNetClientConnectionFactory connectionFactory,
            MessageChannel inboundChannel) {
        TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter = new TcpReceivingChannelAdapter();
        heartbeatReceivingMessageAdapter.setConnectionFactory(connectionFactory);
        heartbeatReceivingMessageAdapter.setOutputChannel(inboundChannel); // ???
        heartbeatReceivingMessageAdapter.setClientMode(true);
        return heartbeatReceivingMessageAdapter;
    }

    @Bean
    public TcpSendingMessageHandler heartbeatSendingMessageHandler(
            TcpNetClientConnectionFactory connectionFactory) {
        TcpSendingMessageHandler heartbeatSendingMessageHandler = new TcpSendingMessageHandler();
        heartbeatSendingMessageHandler.setConnectionFactory(connectionFactory);
        return heartbeatSendingMessageHandler;
    }

    @Bean
    public IntegrationFlow heartbeatClientFlow(
            TcpNetClientConnectionFactory connectionFactory,
            TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter,
            TcpSendingMessageHandler heartbeatSendingMessageHandler,
            MessageChannel outboudChannel) {
        return IntegrationFlows
                .from(outboudChannel) // ??????
                .// adapter ???????????
                .// gateway ???????????
                .// handler ???????????
                .get();
    }

    @Bean
    public HeartbeatClient heartbeatClient(
            MessageChannel outboudChannel,
            PollableChannel inboundChannel) {
        return new HeartbeatClient(outboudChannel, inboundChannel);
    }
}

HeartbeatClient

public class HeartbeatClient {
    private final MessageChannel outboudChannel;
    private final PollableChannel inboundChannel;
    private final Logger log = LogManager.getLogger(HeartbeatClient.class);

    public HeartbeatClient(MessageChannel outboudChannel, PollableChannel inboundChannel) {
        this.inboundChannel = inboundChannel;
        this.outboudChannel = outboudChannel;
    }

    @EventListener
    public void initializaAfterContextIsReady(ContextRefreshedEvent event) {
        log.info("Starting Heartbeat client...");
        start();
    }

    public void start() {
        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
            while (true) {
                try {
                    log.info("Sending Heartbeat");
                    outboudChannel.send(new GenericMessage<String>("status"));
                    Message<?> message = inboundChannel.receive(1000);
                    if (message == null) {
                        log.error("Heartbeat timeouted");
                    } else {
                        String messageStr = new String((byte[]) message.getPayload());
                        if (messageStr.equals("OK")) {
                            log.info("Heartbeat OK response received");
                        } else {
                            log.error("Unexpected message content from server: " + messageStr);
                        }
                    }
                } catch (Exception e) {
                    log.error(e);
                }
            }
        }, 0, 10000, TimeUnit.SECONDS);
    }
}

HeartbeatServerConfig

@Configuration
@EnableIntegration
public class HeartbeatServerConfig {

    @Bean
    public MessageChannel outboudChannel() {
        return new DirectChannel();
    }

    @Bean
    public PollableChannel inboundChannel() {
        return new QueueChannel();
    }

    @Bean
    public TcpNetServerConnectionFactory connectionFactory() {
        TcpNetServerConnectionFactory connectionFactory = new TcpNetServerConnectionFactory(7777);
        return connectionFactory;
    }

    @Bean
    public TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter(
            TcpNetServerConnectionFactory connectionFactory,
            MessageChannel outboudChannel) {
        TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter = new TcpReceivingChannelAdapter();
        heartbeatReceivingMessageAdapter.setConnectionFactory(connectionFactory);
        heartbeatReceivingMessageAdapter.setOutputChannel(outboudChannel);
        return heartbeatReceivingMessageAdapter;
    }

    @Bean
    public TcpSendingMessageHandler heartbeatSendingMessageHandler(
            TcpNetServerConnectionFactory connectionFactory) {
        TcpSendingMessageHandler heartbeatSendingMessageHandler = new TcpSendingMessageHandler();
        heartbeatSendingMessageHandler.setConnectionFactory(connectionFactory);
        return heartbeatSendingMessageHandler;
    }

    @Bean
    public IntegrationFlow heartbeatServerFlow(
            TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter,
            TcpSendingMessageHandler heartbeatSendingMessageHandler,
            MessageChannel outboudChannel) {
        return IntegrationFlows
                .from(heartbeatReceivingMessageAdapter) // ???????????????
                .handle(heartbeatSendingMessageHandler) // ???????????????
                .get();
    }

    @Bean
    public HeartbeatServer heartbeatServer(
            PollableChannel inboundChannel, 
            MessageChannel outboudChannel) {
        return new HeartbeatServer(inboundChannel, outboudChannel);
    }
}

HeartbeatServer

public class HeartbeatServer {
    private final PollableChannel inboundChannel;
    private final MessageChannel outboudChannel;
    private final Logger log = LogManager.getLogger(HeartbeatServer.class);

    public HeartbeatServer(PollableChannel inboundChannel, MessageChannel outboudChannel) {
        this.inboundChannel = inboundChannel;
        this.outboudChannel = outboudChannel;
    }

    @EventListener
    public void initializaAfterContextIsReady(ContextRefreshedEvent event) {
        log.info("Starting Heartbeat");
        start();
    }

    public void start() {
        Executors.newSingleThreadExecutor().execute(() -> {
            while (true) {
                try {
                    Message<?> message = inboundChannel.receive(1000);
                    if (message == null) {
                        log.error("Heartbeat timeouted");
                    } else {
                        String messageStr = new String((byte[]) message.getPayload());
                        if (messageStr.equals("status")) {
                            log.info("Heartbeat received");
                            outboudChannel.send(new GenericMessage<>("OK"));
                        } else {
                            log.error("Unexpected message content from client: " + messageStr);
                        }
                    }
                } catch (Exception e) {
                    log.error(e);
                }
            }
        });
    }
}

Bonus question

Why channel can be set on TcpReceivingChannelAdapter (inbound adapter) but not TcpSendingMessageHandler (outbound adapter)?

UPDATE
Here is the full project source code if anyone is interested for anyone to git clone it:
https://bitbucket.org/espinosa/spring-integration-tcp-demo
I will try to put all suggested solutions there.


Solution

  • It's much simpler with the DSL...

    @SpringBootApplication
    @EnableScheduling
    public class So55154418Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So55154418Application.class, args);
        }
    
        @Bean
        public IntegrationFlow server() {
            return IntegrationFlows.from(Tcp.inboundGateway(Tcp.netServer(1234)))
                    .transform(Transformers.objectToString())
                    .log()
                    .handle((p, h) -> "OK")
                    .get();
        }
    
        @Bean
        public IntegrationFlow client() {
            return IntegrationFlows.from(Gate.class)
                    .handle(Tcp.outboundGateway(Tcp.netClient("localhost", 1234)))
                    .transform(Transformers.objectToString())
                    .handle((p, h) -> {
                        System.out.println("Received:" + p);
                        return null;
                    })
                    .get();
        }
    
        @Bean
        @DependsOn("client")
        public Runner runner(Gate gateway) {
            return new Runner(gateway);
        }
    
        public static class Runner {
    
            private final Gate gateway;
    
            public Runner(Gate gateway) {
                this.gateway = gateway;
            }
    
            @Scheduled(fixedDelay = 5000)
            public void run() {
                this.gateway.send("foo");
            }
    
        }
    
        public interface Gate {
    
            void send(String out);
    
        }
    
    }
    

    Or, get the reply from the Gate method...

        @Bean
        public IntegrationFlow client() {
            return IntegrationFlows.from(Gate.class)
                    .handle(Tcp.outboundGateway(Tcp.netClient("localhost", 1234)))
                    .transform(Transformers.objectToString())
                    .get();
        }
    
        @Bean
        @DependsOn("client")
        public Runner runner(Gate gateway) {
            return new Runner(gateway);
        }
    
        public static class Runner {
    
            private final Gate gateway;
    
            public Runner(Gate gateway) {
                this.gateway = gateway;
            }
    
            @Scheduled(fixedDelay = 5000)
            public void run() {
                String reply = this.gateway.sendAndReceive("foo"); // null for timeout
                System.out.println("Received:" + reply);
            }
    
        }
    
        public interface Gate {
    
            @Gateway(replyTimeout = 5000)
            String sendAndReceive(String out);
    
        }
    

    Bonus:

    Consuming endpoints are actually comprised of 2 beans; a consumer and a message handler. The channel goes on the consumer. See here.

    EDIT

    An alternative, for a single bean for the client...

    @Bean
    public IntegrationFlow client() {
        return IntegrationFlows.from(() -> "foo", 
                        e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(5))))
                .handle(Tcp.outboundGateway(Tcp.netClient("localhost", 1234)))
                .transform(Transformers.objectToString())
                .handle((p, h) -> {
                    System.out.println("Received:" + p);
                    return null;
                })
                .get();
    }