Search code examples
spring-integration

Spring Integration WebFlux.outboundGateway Resequence and Aggregate


How do we resequence() and aggregate() the WebFlux output. Kindly suggest.

Below is our flow,

FileReadingMessageSource -> split(fileSplitterSpec()) -> WebFlux.outboundGateway -> resequence() -> aggregate()

return Files.splitter()
            .markers()
            .applySequence(true)

.aggregate(aggregatorSpec -> aggregatorSpec.processor(new FileAggregator())
                                            .expireGroupsUponCompletion(true)

Update:

{Transfer-Encoding=chunked, CUSTOM_HDR=xxxxxxxxxxxxxxxxxxxxxxx, sequenceNumber=1, errorChannel=xxxxxxxxx, file_name=Test.txt, originalPayload=xxxxxxxxxxxxxxxxxxxxxxx, sequenceSize=0, Connection=keep-alive, file_originalFile=xxxxxxxxxxxxxxxxxxxxxTest.txt, xxx=xxxxxxx, file_marker=START, http_statusCode=200 OK, Date=1692201227000, file_relativePath=Test.txt, b3=c3e09ad7ddfaf837-8c44a5b926144a23-0, nativeHeaders={}, xxxxxxxx=0, correlationId=ded2ea9d-4f72-83fd-9897-6b26ed5f4155, id=5eceaea4-7b6d-1d1d-54ab-14b5d7bb1c77, contentType=application/json, timestamp=1692201227936}

Solution

  • OK. I see your problem. The FileSplitter does not populate a sequenceSize since it does not know how many lines are there in a file. The ResequencingMessageHandler relies on a SequenceSizeReleaseStrategy by default. And exactly that one fails for you since there is no sequenceSize to consult with. See if resequence(resequencer -> resequencer.releasePartialSequences(true)) help you some how. The groupTimeout() probably also can help you when left over is there in a group. Not sure, though, why do you need a resequencer in a first place since it doesn't look like you are messing up with those lines after splitting some way.