Search code examples
spring-integrationspring-integration-dsl

Spring Integration aggregator's release strategy based on last modified


I'm trying to implement the following scenario:

  1. I get a bunch of files that have common file pattern, i.e. doc0001_page0001, doc0001_page0002, doc0001_page0003, doc0002_page0001 (where doc0001 would be one document consisting of 3 pages that I would need to merge, doc0002 would only have 1 page)
  2. I want to aggregate them in a way that I will release a group only if all of the files for specific document are gathered (doc0001 after 3 files were picked up, doc0002 after 1 file)

My idea was to read the files in an alphabetical order and wait for 2 seconds after a group was last modified to release it (g.getLastModified() is smaller than the current time minus 2 seconds)

I've tried the following without success:

return IntegrationFlows.from(Files.inboundAdapter(tmpDir.getRoot())
                                  .patternFilter("*.json")
                                  .useWatchService(true)
                                  .watchEvents(FileReadingMessageSource.WatchEventType.CREATE,
                                          FileReadingMessageSource.WatchEventType.MODIFY),
        e -> e.poller(Pollers.fixedDelay(100)
                             .errorChannel("filePollingErrorChannel")))
                       .enrichHeaders(h -> h.headerExpression("CORRELATION_PATTERN", "headers[" + FileHeaders.FILENAME + "].substring(0,7)")) // docxxxx.length()
                       .aggregate(a -> a.correlationExpression("headers['CORRELATION_PATTERN']")
                                        .releaseStrategy(g -> g.getLastModified() < System.currentTimeMillis() - 2000))                       .channel(MessageChannels.queue("fileReadingResultChannel"))
                       .get();

Changing the release strategy to the following also didn't work:

.aggregate(a -> a.correlationExpression("headers['CORRELATION_PATTERN']")
                .releaseStrategy(g -> {
                    Stream<Message<?>> stream = g.getMessages()
                                                 .stream();
                    Long timestamp = (Long) stream.skip(stream.count() - 1)
                                                  .findFirst()
                                                  .get()
                                                  .getHeaders()
                                                  .get(MessageHeaders.TIMESTAMP);
                    System.out.println("Timestamp: " + timestamp);
                    return timestamp.longValue() < System.currentTimeMillis() - 2000;

                }))

Am I misunderstanding the release strategy concept?

Also, is it possible to print something out from the releaseStrategy block? I wanted to compare the timestamp (see System.out.println("Timestamp: " + timestamp);)


Solution

  • I found a solution to that with a different approach. I still don't understand why the above one wasn't working.

    I've also found a cleaner way of defining the correlation function.

    IntegrationFlows.from(Files.inboundAdapter(tmpDir.getRoot())
                                      .patternFilter("*.json")
                                      .useWatchService(true)
                                      .watchEvents(FileReadingMessageSource.WatchEventType.CREATE, FileReadingMessageSource.WatchEventType.MODIFY), e -> e
            .poller(Pollers.fixedDelay(100)))
                           .enrichHeaders(h -> h.headerFunction(IntegrationMessageHeaderAccessor.CORRELATION_ID, m -> ((String) m
                                   .getHeaders()
                                   .get(FileHeaders.FILENAME)).substring(0, 17)))
                           .aggregate(a -> a.groupTimeout(2000)
                                            .sendPartialResultOnExpiry(true))
                           .channel(MessageChannels.queue("fileReadingResultChannel"))
                           .get();