Search code examples
javaspringtcpspring-integrationsplitter

Spring Integration version 3.0: Splitting TCP stream message into multiple Messages based on content


I am connecting to some legacy server socket, using spring integration framework.

The below is my client factory and adaptor:

<int-ip:tcp-connection-factory id="client"
                               type="client" 
                               host="${tcpServer}" 
                               port="${tcpPort}" 
                               single-use="false"
                               using-nio="false" />

<int-ip:tcp-inbound-channel-adapter id="inboundServer"
                                    client-mode="true"
                                    channel="inputStream" 
                                    error-channel="errorChannel"
                                    retry-interval="${retryInterval}"
                                    connection-factory="client" />

And below the stream to string converter:

    <int:transformer id="clientBytes2String" 
                 input-channel="inputStream"
                 output-channel="inputString" 
                 expression="new String(payload)" />

                 <int:channel id="inputString" />

And the following section is empty, as I am not sure what to implement here, so that it can call my router and router will do it's business.


I have tried with splitter, it did work, if the stream comes in desired format of "ABCD EFGH WXYZ" or "ABCD" but if the stream comes like "ABCD XXXX EFGH WXYZ" then its failing. Desired results is it should process 3 messages and 1 error. But instead it processed 1 messages and all rest are ignored.

The code below:

 <int:splitter input-channel="inputString"
              output-channel="preRouter2"                  
              method="splitMessage"
              ref="messageSplitterBean"/>

And MessageSpliterBean class as follows:

@Splitter
public List<Message<?>> splitMessage( Message<?> message ) {

    List<Message<?>> msgFragments = new ArrayList<Message<?>>();

    //Let say I am assuming message will be coming ABCD EFGH WXYZ
    String str[] = message.getPayload().toString().split(" ");
    int counter = 1;
    for ( String s : str ) {

        Message<String> resultMessage = MessageBuilder.withPayload( s )
                .copyHeaders(message.getHeaders())
                .build();
        msgFragments.add(resultMessage);
    }

    return msgFragments;
}

And the following will be my router, which will send to respective channel based on some expression:

<int:recipient-list-router id="customRouter" input-channel="preRouter2">
<int:recipient channel="input1" selector-expression="payload.toString().startsWith('ABCD')"/>   
<int:recipient channel="input2" selector-expression="payload.toString().startsWith('EFGH')"/>
<int:recipient channel="input3" selector-expression="payload.toString().startsWith('WXYZ')"/>

Need your expert view on this: on what I am doing wrong, or what will be the best approach.

The input from server socket will be in stream of data with fixed length and space as separator. And each fixed length I need to convert them into a message and send it to concerned channel.

Regards.


Solution

  • First of all no reason to implement your own Splitter because the default one has delimiters option:

    Another my point is about redundant <int:transformer> for the byte[] -> String. The Spring Integration provides for you ObjectToStringTransformer out-of-the-box.

    And your issue is <int:recipient-list-router>. As you say you may have something wrong in your data and your router isn't ready to process such a message and it fails for your. That's just because (AbstractMessageRouter):

    else {
        throw new MessageDeliveryException(message, "No channel resolved by router '" + this.getComponentName()
                    + "' and no 'defaultOutputChannel' defined.");
    }
    

    Which happens when no one selector-expression accepts your wrong message.

    In this case it is sent to the error-channel="errorChannel" on your <int-ip:tcp-inbound-channel-adapter> and it stops the further process just because the MessageDeliveryException.

    As you see it looks like to fix your issue you should add default-output-channel to the recipient-list-router configuration.