We are using JdbcOperations.queryForStream()
to fetch 30k+ rows from database as per the inputs from Spring Integration Jdbc OutboundGateway returning 1 record ONLY even with MaxRows(0), however split()
and aggregate()
on the stream is not working. We need the aggregation to work as to know when all the stream records are consumed to perform a final operation.
The splitter doesn't know the size for an Iterator
, Stream
or Flux
request message payloads: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#iterators. Therefore a sequenceSize
header is 0
and default aggregator cannot do its job just because there is no sequenceSize
to compare with. You must provide a custom releaseStrategy
or rely on a groupTimeout
to perform that final operation.
Another trick could be done with JDBC: before calling queryForStream()
you can ask for count of records and set that value into some header in a reply message before splitter. Such a header you can use in a custom releaseStrategy
.
See more info about aggregator features in docs: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator