Search code examples
spring-integrationspring-integration-dsl

How do I continue to perform aggregation on a single thread that has polling?


I create a flow, which polls rows from database by status, validate them and after that aggregate to collection. After processing the entire flow, each line is set to the appropriate status. But when I use aggregator with release strategy TimeoutCountSequenceSizeReleaseStrategy, and elapsed time is so small, release group doesn't happen. And after that the following polling occurs in another thread, but previous message group wasn't handled until amount of messages won't reach the target(threshold) in the strategy.

Code of my flow:

@Bean
public IntegrationFlow testFlow(EntityService entityService,
                                EntityValidator entityValidator,
                                EntityFlowProperties properties,                                       
                                EntityChecker checker) {
    return IntegrationFlows
            .from(getMessageSource(entityService::getByStatus, properties.getMaxRowsPerPoll()),
                    e -> e.poller(getPollerSpec(properties)))
            .split()
            .transform(entityValidator::validate)
            .filter(ValidationStatus<Entity>::isValid, filter ->
                    filter.discardFlow(flow -> flow.handle(entityService::handleValidationErrors)))
            .transform(ValidationStatus<Entity>::getEntity)
            .aggregate(aggregatorSpec -> aggregatorSpec.releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(5, 10000)))
            .transform(checker::checkOnSomething)
            .split()
            .transform(CheckResultAware<Entity>::getEntity)
            .handle(entityService::saveAndChangeStatus)
            .get();

I expect to perform aggregation on the same thread as polling, and don't make a new polling until the current flow is over.

The way of changing statuses between polling and aggregation isn't suitable.

Is there a way to do this?


Solution

  • Why do you need TimeoutCountSequenceSizeReleaseStrategy; your sequences are finite; just use the default SimpleSequenceSizeReleaseStrategy.

    However the TimeoutCountSequenceSizeReleaseStrategy should release based on the sequence size anyway.

    But, it's not really suitable for your use case because you can be left with a partial group in the store.