In my project, I have the following List
of AccountWithFiles
objects.
@AllArgsConstructor
@Getter
class AccountWithFiles {
private String account;
private List<S3FileInfo> s3FileInfoList;
}
I want to process each AccountWithFiles
separately in a new thread. Then split s3FileInfoList
using split()
and process them one by one with 20 min delay, however in parallel each account
with s3FileInfoList
.
So I have the following DSL definition:
@Bean
public IntegrationFlow s3DownloadFlowEnhanced() {
return IntegrationFlows.fromSupplier(s3FileInfoRepository::findAllGroupByAccount,
c -> c.poller(Pollers.cron(time, TimeZone.getTimeZone(zone))).id("s3TEMPO"))
.channel("manualS3EnhancedFlow")
.split()
.channel("myChannel")
.get();
}
s3FileInfoRepository::findAllGroupByAccount
returns the list of AccountWithFiles
objects after that I'm splitting them and send them to MessageChannels
Executors
channel (with defined number of threads)
@Bean
public MessageChannel myChannel() {
return MessageChannels.publishSubscribe(Executors.newFixedThreadPool(10)).get();
}
After that
@Bean
public IntegrationFlow processEachAccountSeparately() {
return IntegrationFlows.from("myChannel")
.<AccountWithFiles, Object>transform(m -> m.getS3FileInfoList().stream().sorted(
Comparator.comparing(i -> i.getOperationType() == FILE_OPERATION_TYPE.ADD))
.collect(Collectors.toList()))
.log()
//.resequence()
.split()
.channel("bookItemsChannel")
.get();
}
@Bean
public PollableChannel bookItemsChannel(){
return new QueueChannel();
}
@Bean
public IntegrationFlow test() {
return IntegrationFlows.from("bookItemsChannel")
.delay("delayer.messageGroupId", d -> d
.defaultDelay(25000L)
.delayExpression("headers['delay']"))
.log()
.get();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5);
taskExecutor.initialize();
pollerMetadata.setTaskExecutor(taskExecutor);
pollerMetadata.setTrigger(new PeriodicTrigger(15000L));
pollerMetadata.setMaxMessagesPerPoll(3);
return pollerMetadata;
}
When messages are received by the Pollable channel they are processed one by one with delay. I want my messages to process one by one however in parallel based on splitter from s3DownloadFlowEnhanced
flow.
I know that pollable channels distinguish a sender and receiver for the message in a different thread. Maybe there is any workaround here?
In processEachAccountSeparately
flow I see that each account has its own thread.
2021-06-24 15:33:34.585 INFO 56174 --- [pool-4-thread-1] o.s.integration.handler.LoggingHandler : GenericMessage [payload=[S3FileInfo(fileName=sdfsdf, timeStamp=null, serviceName=null, accountLogin=login2, operationType=ADD)], headers={sequenceNumber=1, correlationId=36b132ac-7c5b-af66-96c0-2334a757c960, id=3bafdaa7-ed4c-087f-5f2c-cc114eae42cd, sequenceSize=2, timestamp=1624538014577}]
2021-06-24 15:33:34.585 INFO 56174 --- [pool-4-thread-2] o.s.integration.handler.LoggingHandler : GenericMessage [payload=[S3FileInfo(fileName=sdfjsjfj, timeStamp=null, serviceName=null, accountLogin=login1, operationType=DELETE), S3FileInfo(fileName=s3/outgoing/file2, timeStamp=null, serviceName=IPASS, accountLogin=login1, operationType=DELETE), S3FileInfo(fileName=outgoing/s3/ipass.xlsx, timeStamp=null, serviceName=IPASS, accountLogin=login1, operationType=ADD), S3FileInfo(fileName=dsfsdf, timeStamp=null, serviceName=null, accountLogin=login1, operationType=ADD)], headers={sequenceNumber=2, correlationId=36b132ac-7c5b-af66-96c0-2334a757c960, id=d8506721-2cfd-b6da-d353-4fb8bd5744fb, sequenceSize=2, timestamp=1624538014577}]
However, PollableChannel executes it one by one
2021-06-24 15:33:46.328 INFO 56174 --- [lTaskExecutor-3] o.s.integration.handler.LoggingHandler : GenericMessage [payload=S3FileInfo(fileName=sdfjsjfj, timeStamp=null, serviceName=null, accountLogin=login1, operationType=DELETE), headers={sequenceNumber=1, sequenceDetails=[[36b132ac-7c5b-af66-96c0-2334a757c960, 2, 2]], correlationId=d8506721-2cfd-b6da-d353-4fb8bd5744fb, id=7f8bd9a6-25ce-0bb2-c3f3-581d823d8fce, sequenceSize=4, timestamp=1624538014585}]
2021-06-24 15:33:46.329 INFO 56174 --- [lTaskExecutor-3] o.s.integration.handler.LoggingHandler : GenericMessage [payload=S3FileInfo(fileName=sdfsdf, timeStamp=null, serviceName=null, accountLogin=login2, operationType=ADD), headers={sequenceNumber=1, sequenceDetails=[[36b132ac-7c5b-af66-96c0-2334a757c960, 1, 2]], correlationId=3bafdaa7-ed4c-087f-5f2c-cc114eae42cd, id=f697b52b-1053-51aa-232f-88bb602dc1c9, sequenceSize=1, timestamp=1624538014585}]
2021-06-24 15:33:46.329 INFO 56174 --- [lTaskExecutor-3] o.s.integration.handler.LoggingHandler : GenericMessage [payload=S3FileInfo(fileName=s3/outgoing/file2, timeStamp=null, serviceName=IPASS, accountLogin=login1, operationType=DELETE), headers={sequenceNumber=2, sequenceDetails=[[36b132ac-7c5b-af66-96c0-2334a757c960, 2, 2]], correlationId=d8506721-2cfd-b6da-d353-4fb8bd5744fb, id=a6754c98-fce0-f132-664a-65d61f553ae2, sequenceSize=4, timestamp=1624538014585}]
2021-06-24 15:34:01.333 INFO 56174 --- [lTaskExecutor-4] o.s.integration.handler.LoggingHandler : GenericMessage [payload=S3FileInfo(fileName=outgoing/s3/ipass.xlsx, timeStamp=null, serviceName=IPASS, accountLogin=login1, operationType=ADD), headers={sequenceNumber=3, sequenceDetails=[[36b132ac-7c5b-af66-96c0-2334a757c960, 2, 2]], correlationId=d8506721-2cfd-b6da-d353-4fb8bd5744fb, id=71fa915a-fcaa-3d00-023b-5cf51be3b183, sequenceSize=4, timestamp=1624538014585}]
2021-06-24 15:34:01.333 INFO 56174 --- [lTaskExecutor-4] o.s.integration.handler.LoggingHandler : GenericMessage [payload=S3FileInfo(fileName=dsfsdf, timeStamp=null, serviceName=null, accountLogin=login1, operationType=ADD), headers={sequenceNumber=4, sequenceDetails=[[36b132ac-7c5b-af66-96c0-2334a757c960, 2, 2]], correlationId=d8506721-2cfd-b6da-d353-4fb8bd5744fb, id=7c513e23-5484-4f61-b7d3-362648c7b89c, sequenceSize=4, timestamp=1624538014585}]
What I want is to have something like this:
[pool-4-thread-1] simultaneously
[pool-4-thread-2] simultaneously
[pool-4-thread-2] 20 min delay
[pool-4-thread-2] 20 min delay
[pool-4-thread-2] 20 min delay
Your first step is correct about splitting a list of AccountWithFiles
. Then you say that you'd like to split s3FileInfoList
and process them sequentially, one by one, but why you place them into a QeueuChannel
? The regular, default DirectChannel
would be enough in this case.
However then you go to delay()
which does not block the current thread, but schedule a task in a future in a separate thread. So, you probably need to rethink your solution since with the current approach, even if you get rid off that bookItemsChannel
as a queue, you still are not going to process delayed messages sequentially. There is just no guarantee on TaskScheduler
which one is going to be performed first when they all have the same schedule time.