I'm trying to implement the following scenario:
My idea was to read the files in an alphabetical order and wait for 2 seconds after a group was last modified to release it (g.getLastModified()
is smaller than the current time minus 2 seconds)
I've tried the following without success:
return IntegrationFlows.from(Files.inboundAdapter(tmpDir.getRoot())
.patternFilter("*.json")
.useWatchService(true)
.watchEvents(FileReadingMessageSource.WatchEventType.CREATE,
FileReadingMessageSource.WatchEventType.MODIFY),
e -> e.poller(Pollers.fixedDelay(100)
.errorChannel("filePollingErrorChannel")))
.enrichHeaders(h -> h.headerExpression("CORRELATION_PATTERN", "headers[" + FileHeaders.FILENAME + "].substring(0,7)")) // docxxxx.length()
.aggregate(a -> a.correlationExpression("headers['CORRELATION_PATTERN']")
.releaseStrategy(g -> g.getLastModified() < System.currentTimeMillis() - 2000)) .channel(MessageChannels.queue("fileReadingResultChannel"))
.get();
Changing the release strategy to the following also didn't work:
.aggregate(a -> a.correlationExpression("headers['CORRELATION_PATTERN']")
.releaseStrategy(g -> {
Stream<Message<?>> stream = g.getMessages()
.stream();
Long timestamp = (Long) stream.skip(stream.count() - 1)
.findFirst()
.get()
.getHeaders()
.get(MessageHeaders.TIMESTAMP);
System.out.println("Timestamp: " + timestamp);
return timestamp.longValue() < System.currentTimeMillis() - 2000;
}))
Am I misunderstanding the release strategy concept?
Also, is it possible to print something out from the releaseStrategy block? I wanted to compare the timestamp (see System.out.println("Timestamp: " + timestamp);
)
I found a solution to that with a different approach. I still don't understand why the above one wasn't working.
I've also found a cleaner way of defining the correlation function.
IntegrationFlows.from(Files.inboundAdapter(tmpDir.getRoot())
.patternFilter("*.json")
.useWatchService(true)
.watchEvents(FileReadingMessageSource.WatchEventType.CREATE, FileReadingMessageSource.WatchEventType.MODIFY), e -> e
.poller(Pollers.fixedDelay(100)))
.enrichHeaders(h -> h.headerFunction(IntegrationMessageHeaderAccessor.CORRELATION_ID, m -> ((String) m
.getHeaders()
.get(FileHeaders.FILENAME)).substring(0, 17)))
.aggregate(a -> a.groupTimeout(2000)
.sendPartialResultOnExpiry(true))
.channel(MessageChannels.queue("fileReadingResultChannel"))
.get();