How do we resequence()
and aggregate()
the WebFlux output. Kindly suggest.
Below is our flow,
FileReadingMessageSource
-> split(fileSplitterSpec())
-> WebFlux.outboundGateway
-> resequence()
-> aggregate()
return Files.splitter()
.markers()
.applySequence(true)
.aggregate(aggregatorSpec -> aggregatorSpec.processor(new FileAggregator())
.expireGroupsUponCompletion(true)
Update:
{Transfer-Encoding=chunked, CUSTOM_HDR=xxxxxxxxxxxxxxxxxxxxxxx, sequenceNumber=1, errorChannel=xxxxxxxxx, file_name=Test.txt, originalPayload=xxxxxxxxxxxxxxxxxxxxxxx, sequenceSize=0, Connection=keep-alive, file_originalFile=xxxxxxxxxxxxxxxxxxxxxTest.txt, xxx=xxxxxxx, file_marker=START, http_statusCode=200 OK, Date=1692201227000, file_relativePath=Test.txt, b3=c3e09ad7ddfaf837-8c44a5b926144a23-0, nativeHeaders={}, xxxxxxxx=0, correlationId=ded2ea9d-4f72-83fd-9897-6b26ed5f4155, id=5eceaea4-7b6d-1d1d-54ab-14b5d7bb1c77, contentType=application/json, timestamp=1692201227936}
OK. I see your problem. The FileSplitter
does not populate a sequenceSize
since it does not know how many lines are there in a file. The ResequencingMessageHandler
relies on a SequenceSizeReleaseStrategy
by default. And exactly that one fails for you since there is no sequenceSize
to consult with. See if resequence(resequencer -> resequencer.releasePartialSequences(true))
help you some how. The groupTimeout()
probably also can help you when left over is there in a group. Not sure, though, why do you need a resequencer
in a first place since it doesn't look like you are messing up with those lines after splitting some way.