Search code examples
javaspringspring-integrationaggregator

Spring Integration: Aggregator pattern - a way for downstream component to know if group is complete


We are using Spring Integration 6.3.3 Aggregator pattern, with JDBCMessageStore, and expiry timeout. We want to also send the partial group on expiry, which we consider as an error condition. We want to log/alert when this error happens.

What is the best way for the downstream component in the IntegrationFlow, when it receives the aggregated messages, to know if the messages are a complete group or partial?

Here's a simplified example:

    @Autowired
    PetGroupHandler petGroupHandler;

    @Bean
    public JdbcMessageStore jdbcMessageStore(
            @Qualifier("mydbDataSource") DataSource dataSource) {
        JdbcMessageStore messageStore = new JdbcMessageStore(dataSource);
        messageStore.setRegion("petstore-pubsub");
        return messageStore;
    }

    @Bean
    IntegrationFlow petStoreSubscriptionFlow(JdbcMessageStore jdbcMessageStore, PetOutputProcessor petOutputProcessor) {
        return IntegrationFlow.from("petStoreSubscriptionMessageChannel")
                .filter(petOfInterestFilter, "shouldProcess")
                .aggregate(aggregatorSpec -> aggregatorSpec
                        .messageStore(jdbcMessageStore)
                        .outputProcessor(petOutputProcessor)
                        .expireGroupsUponCompletion(true) 
                        .groupTimeout(300 * 1000) // 5 minutes
                        .sendPartialResultOnExpiry(true) // send partial group
                        .correlationStrategy(message -> ((Pet) message.getPayload()).getBreed())
                        .releaseStrategy(group -> group.size()>=2))
                .handle(petGroupHandler, "handle")
                .get();

Here's the repository link: https://github.com/shintasmith/petstore-messaging for the above code.

When the PetGroupHandler in the above example receives a list of Pet, is there a way to know if the list of Pet is a complete group or partial, so the handler can do something different for complete vs partial group?

We tried implementing an OutputProcessor that injects a boolean header, indicating if the group is complete. But this does not work because it looks like the group is not set to complete in AbstractCorrelatingMessageHandler.java, right after the release strategy is called and before the OutputProcessor is called in the completeGroup() method.

            if (this.releaseStrategy.canRelease(messageGroup)) {
                Collection<Message<?>> completedMessages = null;
                try {
                    noOutput = false;
                    completedMessages = completeGroup(message, correlationKey, messageGroup, lock);
                }
                finally {
                    // Possible clean (implementation dependency) up
                    // even if there was an exception processing messages
                    afterRelease(messageGroup, completedMessages);
                }
                if (!isExpireGroupsUponCompletion() && this.minimumTimeoutForEmptyGroups > 0) {
                    removeEmptyGroupAfterTimeout(groupIdUuid, this.minimumTimeoutForEmptyGroups);
                }
            }

https://github.com/spring-projects/spring-integration/blob/6.3.x/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java#L570-L584

In the above example the release strategy is quite simple. You could argue I could just check the list size. But in our real application, the release strategy is more complex. I would like to avoid our handler knowing anything about the release strategy.

I kinda wish the .sendPartialResultOnExpiry() has a channel or a handler parameter, to which the partial group is sent. But there isn't a way to specify that.

Any suggestion on how to do this is appreciated! Thanks!


Solution

  • A bit further in the AbstractCorrelatingMessageHandler there is a logic:

    protected void expireGroup(Object correlationKey, MessageGroup group, Lock lock) {
        this.logger.debug(() -> "Expiring MessageGroup with correlationKey[" + correlationKey + "]");
        if (this.sendPartialResultOnExpiry) {
            this.logger.debug(() -> "Prematurely releasing partially complete group with key ["
                    + correlationKey + "] to: " + getOutputChannel());
            completeGroup(correlationKey, group, lock);
        }
        else {
            this.logger.debug(() -> "Discarding messages of partially complete group with key ["
                    + correlationKey + "] to: "
                    + (this.discardChannelName != null ? this.discardChannelName : this.discardChannel));
            if (this.releaseLockBeforeSend) {
                lock.unlock();
            }
            group.getMessages()
                    .forEach(this::discardMessage);
        }
        if (this.applicationEventPublisher != null) {
            this.applicationEventPublisher.publishEvent(
                    new MessageGroupExpiredEvent(this, correlationKey, group.size(),
                            new Date(group.getLastModified()), new Date(), !this.sendPartialResultOnExpiry));
        }
    }
    

    So, if sendPartialResultOnExpiry(false) (default), all those messages in partially complete group are going to be discarded. See docs for more info: https://docs.spring.io/spring-integration/reference/aggregator.html#aggregator-xml.

    Pay attention to bullet #8:

    Indicates that expired messages should be aggregated and sent to the 'output-channel' or 'replyChannel' once their containing MessageGroup is expired (see MessageGroupStore.expireMessageGroups(long)). One way of expiring a MessageGroup is by configuring a MessageGroupStoreReaper. However, you can alternatively expire MessageGroup by calling MessageGroupStore.expireMessageGroups(timeout). You can accomplish that through a Control Bus operation or, if you have a reference to the MessageGroupStore instance, by invoking expireMessageGroups(timeout). Otherwise, by itself, this attribute does nothing. It serves only as an indicator of whether to discard or send to the output or reply channel any messages that are still in the MessageGroup that is about to be expired. Optional (the default is false). NOTE: This attribute might more properly be called send-partial-result-on-timeout, because the group may not actually expire if expire-groups-upon-timeout is set to false.

    UPDATE

    We kinda want to get messages aggregated in a group together by their correlationId.

    I see your point and that indeed makes sense in some cases. Please, consider to raise a GH issue and we will see what we can do. Perhaps a simple discardIndividually flag for the AbstractCorrelatingMessageHandler to sent only one discard message for the whole expired (incomplete) group, or as it is right now - one message after another.

    As the way to implement it right now in your project, take a look into MessageGroupProcessor strategy. I see you do use petOutputProcessor.

    So, that one accepts a MessageGroup which has isComplete() property. Therefore when you build a reply message for this incomplete group you can add some header which you can examine in the filter after this aggregator to decide if you can go proceed downstream or discard since your logic treats it as an error.