Search code examples
javaspring-integrationspring-integration-dslspring-integration-ip

Echo socket service in Spring Integration DSL using Channels and Gateways


This is variant of my question How to implement simple echo socket service in Spring Integration DSL. A good working solutions was introduced but I would like to explore alternatives. Particularly I am interested in solution based on using inbound and outbound channels explicitly, in client and server implementations. Is that possible?

So far I was able to come up with:

HeartbeatClientConfig

...
@Bean
public IntegrationFlow heartbeatClientFlow(
        TcpNetClientConnectionFactory clientConnectionFactory,
        MessageChannel outboundChannel,
        PollableChannel inboundChannel) {
    return IntegrationFlows
            .from(outboundChannel)
            .handle(Tcp.outboundGateway(clientConnectionFactory))
            .channel(inboundChannel)
            .get();
}
...

HeartbeatClient

public HeartbeatClient(MessageChannel outboudChannel, PollableChannel inboundChannel) {
    this.inboundChannel = inboundChannel;
    this.outboudChannel = outboudChannel;
}
...
void run() {
    // ..in scheduled intervals in loop 
    outboudChannel.send(new GenericMessage<String>("status"));
    Message<?> message = inboundChannel.receive(1000);
}

The client part seems to be working fine. Problem is on the server side.

HeartbeatServer

public HeartbeatServer(PollableChannel inboundChannel, MessageChannel outboudChannel) {
    this.inboundChannel = inboundChannel;
    this.outboudChannel = outboudChannel;
}
...
void run() {
    // ..in some kind of loop
    Message<?> message = inboundChannel.receive(1000); // presumably a blocking call
    ...
    outboudChannel.send(new GenericMessage<>("OK"));
    ...
}

HeartbeatServerConfig
Here comes the most tricky part where I am sure I am wrong. I just don't know what I should do. Here I naively use inverse approach from client implementation, where it seems to be working; inverse in sense of switching inbound and outbound channels in the Flow definition.

...
@Bean
public IntegrationFlow heartbeatServerFlow(
        MessageChannel outboundChannel,
        PollableChannel inboundChannel) {
    return IntegrationFlows
            .from(inboundChannel)
            .handle(Tcp.inboundGateway(Tcp.netServer(7777)))
            .channel(outboundChannel)
            .get();
}
...

The server does not work, throwing cryptic exception about Found ambiguous parameter type [class java.lang.Boolean] for method match ... followed by a long list of Spring and Spring Integration methods.

Full source code can be found here.


Solution

  • You can't start the server-side flow with a channel.

    The flow starts with the gateway; it handles all the socket communication. When is receives a message it sends it to a channel.

    You could do this...

    @Bean
    public IntegrationFlow server(PollableChannel requests, MessageChannel replies) {
        return IntegrationFlows.from(Tcp.inboundGateway(Tcp.netServer(1234))
                    .replyChannel(replies))
                .transform(Transformers.objectToString())
                .channel(requests)
                .get();
    }
    

    But I would ask why you would want to because now you have to manage your own thread to receive from the request channel and write to the reply channel. In order for this to work, the replyChannel header from the request message must be copied to the reply message. In fact, you don't really need a reply channel; you can send the reply to the replyChannel header directly (that's what happens internally, we bridge the reply channel to the header channel).

    It's much simpler to handle the request on the gateway's thread.