Search code examples
spring-integration

Spring Integration JdbcOperations.queryForStream() - split and aggregate


We are using JdbcOperations.queryForStream() to fetch 30k+ rows from database as per the inputs from Spring Integration Jdbc OutboundGateway returning 1 record ONLY even with MaxRows(0), however split() and aggregate() on the stream is not working. We need the aggregation to work as to know when all the stream records are consumed to perform a final operation.


Solution

  • The splitter doesn't know the size for an Iterator, Stream or Flux request message payloads: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#iterators. Therefore a sequenceSize header is 0 and default aggregator cannot do its job just because there is no sequenceSize to compare with. You must provide a custom releaseStrategy or rely on a groupTimeout to perform that final operation.

    Another trick could be done with JDBC: before calling queryForStream() you can ask for count of records and set that value into some header in a reply message before splitter. Such a header you can use in a custom releaseStrategy.

    See more info about aggregator features in docs: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator