Search code examples
javaspringspring-integrationspring-integration-dsl

Adding resequencer to output channel removes NoSuchBeanDefinitionException and changes ChannelMapper behaviour


I am using multiple IntegrationFlows to process incoming messages. On the first channel I select the initial Channel for processing:

@Bean
public IntegrationFlow mappingFlow() {
       return IntegrationFlows.from(inputChannel())
            //route payload to channel
            .handle(Message.class, (payload, header) -> (byte[])payload.getPayload())
            .<byte[], Integer> route(p -> Parser.getType(p)), 
                    mapping -> mapping
                     .channelMapping(1, "1Channel")
                     .channelMapping(2, "2Channel")
                     .channelMapping(3, "3Channel")
                     .channelMapping(4, "4Channel")
                     )
            .get();
}

There are several other Integrationflows in between, which process the message and then copy the sequence number from the incoming payload to the message headers:

@Bean
public IntegrationFlow process1Channel() {
    return IntegrationFlows.from("1Channel")
            .enrichHeaders(h ->
            h.headerFunction(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, message -> Parser.getSequenceNumber(byte[])(message.getPayload())
            .handle(new GenericHandler<byte[]>() {                          
                @Override
                public Object handle(byte[] payload, MessageHeaders headers) {
                    ...
                    return output;
                }

              })
              .channel("output")
              .get();
}

In the end, there is an output channel, which sends the created reply message back to the caller, which works well like this:

@Bean
public IntegrationFlow processOutput() {
    return IntegrationFlows.from("output")
            .handle(new GenericHandler<byte[]>() {                          
                @Override
                public Object handle(byte[] payload, MessageHeaders headers) {
                    ...
                    return payload;
                }

              })
              .get();
}

However, since I need to make sure that resequencing is based on the SequenceNumber that is set in the preceeding integrationflows, I tried to add the following resequencing (There is probably a better way to specify the correlationExpression, but not specifying one results in 'java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing?'):

.resequence(r -> r.releasePartialSequences(true).order(Ordered.LOWEST_PRECEDENCE).correlationExpression("headers[ip_connectionId]"))

Now the issue is, that it works, but there is no Exception anymore, if an unknown/undefined input Message type (e.g. msg type 8) is sent: Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No bean named '8' available

Putting:

//                       .resolutionRequired(true)

on the channel mapping of the initial flow has no effect either, it just seems to be ignored as soon as the resequence is added to the output channel (output channel is the only place, where I have put the resequence). Furthermore, if the resequence is added to output, the mapping on input seems not to process incoming unknown messages at all, the whole processing is just stuck.

Am I doing something wrong by just adding that resequence method above, am I missing defaults which I need to set explicitly when specifying the resequence or is this a bug (adding resequence on outputChannel changes inputChannel mapping behaviour)?

Thanks.


Solution

  • The resequencer works well only if your messages have a IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER header. Then when your release strategy returns true, the messages in the group are sorted and emitted only those which are sequential without gaps in between.

    Your router mapping concern probably fully not relevant to the resequencer question: resolutionRequired is true by default anyway...