Search code examples
javaspring-integration

Spring Integration FileAggregator not invoked when the the same file is placed for reprocessing


In the below flow, the file split, aggregation works perfectly fine with new (different) files. When a processed file is placed again and picked by the Poller, it goes into the FileSplitterSpec() but never reaches the FileAggregator.

Are there any markers placed on the processed file, that it is failing to reach the aggregator the next time?

return IntegrationFlows.from(txnChannel())
                        .split(fileSplitterSpec())
                        .filter(payload -> !(payload instanceof FileMarker), e -> e.discardChannel("aggregatorChannel"))
                        .<String> filter(StringUtils::isNotBlank, e -> e.discardChannel("aggregatorChannel"))
                        .transform(transformer, "transform")
                        .handle((payload, headers) -> {
                            headers.get("indHeader", DTO.class).getTxn().add(payload);
                            return payload;
                        })
                        .channel("aggregatorChannel")
                        .aggregate(new FileAggregator())


@Bean
public FileSplitterSpec fileSplitterSpec() {

    return Files.splitter()
                .markers()
                .firstLineAsHeader("someHeader");
}

Solution

  • Yes, the correlation key for group aggregation is based on the file name. You need to use different file name or consider to configure an aggregator for expireGroupsUponCompletion(true). The new FileAggregator() should go to the processor() option of the aggregator spec.