Search code examples
spring-integration

Spring Integration resequencer does not release the last group of messages


I have the following configuration:

    @Bean
    public IntegrationFlow messageFlow(JdbcMessageStore groupMessageStore, TransactionSynchronizationFactory syncFactory, TaskExecutor te, ThreadPoolTaskScheduler ts, RealTimeProcessor processor) {
        return IntegrationFlows
                .from("inputChannel")
                .handle(processor, "handleInputMessage", consumer -> consumer
                        .taskScheduler(ts)
                        .poller(poller -> poller
                                .fixedDelay(pollerFixedDelay)
                                .receiveTimeout(pollerReceiveTimeout)
                                .maxMessagesPerPoll(pollerMaxMessagesPerPoll)
                                .taskExecutor(te)
                                .transactional()
                                .transactionSynchronizationFactory(syncFactory)))
                .resequence(s -> s.messageStore(groupMessageStore)
                        .releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(50, 30000)))
                .channel("sendingChannel")
                .handle(processor, "sendMessage")
                .get();
    }

If I send a single batch of e.g. 100 messages to the inputChannel it works as expected until there are no messages in the inputChannel. After the inputChannel becomes empty it also stops processing for messages that were waiting for sequencing. As a result there are always a couple of messages left in the groupMessageStore even after the set release timeout.

I'm guessing it's because the poller is configured only for the inputChannel and if there are no messages in there it will never get to the sequencer (so will never call canRelease on the release strategy). But if I try adding a separate poller for the resequencer I get the following error A poller should not be specified for endpoint since channel x is a SubscribableChannel (not pollable).

Is there a different way to configure it so that the last group of messages is always released?


Solution

  • The release strategy is passive and needs something to trigger it to be called.

    Add .groupTimeout(...) to release the partial sequence after the specified time elapses.

    EDIT

    @SpringBootApplication
    public class So67993972Application {
    
        private static final Logger log = LoggerFactory.getLogger(So67993972Application.class);
    
        public static void main(String[] args) {
            SpringApplication.run(So67993972Application.class, args);
        }
    
        @Bean
        IntegrationFlow flow(MessageGroupStore mgs) {
            return IntegrationFlows.from(MessageChannels.direct("input"))
                    .resequence(e -> e.messageStore(mgs)
                                        .groupTimeout(5_000)
                                        .sendPartialResultOnExpiry(true)
                                        .releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(50, 2000)))
                    .channel(MessageChannels.queue("output"))
                    .get();
        }
    
        @Bean
        MessageGroupStore mgs() {
            return new SimpleMessageStore();
        }
    
        @Bean
        public ApplicationRunner runner(MessageChannel input, QueueChannel output, MessageGroupStore mgs) {
            return args -> {
                MessagingTemplate template = new MessagingTemplate(input);
                log.info("Sending");
                template.send(MessageBuilder.withPayload("foo")
                        .setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "bar")
                        .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 2)
                        .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 2)
                        .build());
                log.info(output.receive(10_000).toString());
                Thread.sleep(1000);
                log.info(mgs.getMessagesForGroup("bar").toString());
            };
        }
    
    }