Search code examples
spring-integrationspring-integration-dsl

Spring integration use PollableChannel simultaneously


In my project, I have the following List of AccountWithFiles objects.

@AllArgsConstructor
  @Getter
  class AccountWithFiles {

    private String account;
    private List<S3FileInfo> s3FileInfoList;
  }

I want to process each AccountWithFiles separately in a new thread. Then split s3FileInfoList using split() and process them one by one with 20 min delay, however in parallel each account with s3FileInfoList.

So I have the following DSL definition:

  @Bean
  public IntegrationFlow s3DownloadFlowEnhanced() {
    return IntegrationFlows.fromSupplier(s3FileInfoRepository::findAllGroupByAccount,
        c -> c.poller(Pollers.cron(time, TimeZone.getTimeZone(zone))).id("s3TEMPO"))
        .channel("manualS3EnhancedFlow")
        .split()
        .channel("myChannel")
        .get();
  }

s3FileInfoRepository::findAllGroupByAccount returns the list of AccountWithFiles objects after that I'm splitting them and send them to MessageChannels Executors channel (with defined number of threads)

  @Bean
  public MessageChannel myChannel() {
    return MessageChannels.publishSubscribe(Executors.newFixedThreadPool(10)).get();
  }

After that

 @Bean
  public IntegrationFlow processEachAccountSeparately() {
    return IntegrationFlows.from("myChannel")
        .<AccountWithFiles, Object>transform(m -> m.getS3FileInfoList().stream().sorted(
              Comparator.comparing(i -> i.getOperationType() == FILE_OPERATION_TYPE.ADD))
              .collect(Collectors.toList()))
        .log()
        //.resequence()
        .split()
        .channel("bookItemsChannel")
        .get();
  }
  
  @Bean
  public PollableChannel bookItemsChannel(){
    return new QueueChannel();
  }
  
  @Bean
  public IntegrationFlow test() {
    return IntegrationFlows.from("bookItemsChannel")
        .delay("delayer.messageGroupId", d -> d
            .defaultDelay(25000L)
            .delayExpression("headers['delay']"))
        .log()
        .get();
  }


  @Bean(name = PollerMetadata.DEFAULT_POLLER)
  public PollerMetadata defaultPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(5);
    taskExecutor.initialize();

    pollerMetadata.setTaskExecutor(taskExecutor);
    pollerMetadata.setTrigger(new PeriodicTrigger(15000L));
    pollerMetadata.setMaxMessagesPerPoll(3);
    return pollerMetadata;
  }

When messages are received by the Pollable channel they are processed one by one with delay. I want my messages to process one by one however in parallel based on splitter from s3DownloadFlowEnhancedflow. I know that pollable channels distinguish a sender and receiver for the message in a different thread. Maybe there is any workaround here?

In processEachAccountSeparately flow I see that each account has its own thread.

    2021-06-24 15:33:34.585  INFO 56174 --- [pool-4-thread-1] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=[S3FileInfo(fileName=sdfsdf, timeStamp=null, serviceName=null, accountLogin=login2, operationType=ADD)], headers={sequenceNumber=1, correlationId=36b132ac-7c5b-af66-96c0-2334a757c960, id=3bafdaa7-ed4c-087f-5f2c-cc114eae42cd, sequenceSize=2, timestamp=1624538014577}]
    2021-06-24 15:33:34.585  INFO 56174 --- [pool-4-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=[S3FileInfo(fileName=sdfjsjfj, timeStamp=null, serviceName=null, accountLogin=login1, operationType=DELETE), S3FileInfo(fileName=s3/outgoing/file2, timeStamp=null, serviceName=IPASS, accountLogin=login1, operationType=DELETE), S3FileInfo(fileName=outgoing/s3/ipass.xlsx, timeStamp=null, serviceName=IPASS, accountLogin=login1, operationType=ADD), S3FileInfo(fileName=dsfsdf, timeStamp=null, serviceName=null, accountLogin=login1, operationType=ADD)], headers={sequenceNumber=2, correlationId=36b132ac-7c5b-af66-96c0-2334a757c960, id=d8506721-2cfd-b6da-d353-4fb8bd5744fb, sequenceSize=2, timestamp=1624538014577}]

However, PollableChannel executes it one by one

2021-06-24 15:33:46.328  INFO 56174 --- [lTaskExecutor-3] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=S3FileInfo(fileName=sdfjsjfj, timeStamp=null, serviceName=null, accountLogin=login1, operationType=DELETE), headers={sequenceNumber=1, sequenceDetails=[[36b132ac-7c5b-af66-96c0-2334a757c960, 2, 2]], correlationId=d8506721-2cfd-b6da-d353-4fb8bd5744fb, id=7f8bd9a6-25ce-0bb2-c3f3-581d823d8fce, sequenceSize=4, timestamp=1624538014585}]
2021-06-24 15:33:46.329  INFO 56174 --- [lTaskExecutor-3] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=S3FileInfo(fileName=sdfsdf, timeStamp=null, serviceName=null, accountLogin=login2, operationType=ADD), headers={sequenceNumber=1, sequenceDetails=[[36b132ac-7c5b-af66-96c0-2334a757c960, 1, 2]], correlationId=3bafdaa7-ed4c-087f-5f2c-cc114eae42cd, id=f697b52b-1053-51aa-232f-88bb602dc1c9, sequenceSize=1, timestamp=1624538014585}]
2021-06-24 15:33:46.329  INFO 56174 --- [lTaskExecutor-3] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=S3FileInfo(fileName=s3/outgoing/file2, timeStamp=null, serviceName=IPASS, accountLogin=login1, operationType=DELETE), headers={sequenceNumber=2, sequenceDetails=[[36b132ac-7c5b-af66-96c0-2334a757c960, 2, 2]], correlationId=d8506721-2cfd-b6da-d353-4fb8bd5744fb, id=a6754c98-fce0-f132-664a-65d61f553ae2, sequenceSize=4, timestamp=1624538014585}]
2021-06-24 15:34:01.333  INFO 56174 --- [lTaskExecutor-4] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=S3FileInfo(fileName=outgoing/s3/ipass.xlsx, timeStamp=null, serviceName=IPASS, accountLogin=login1, operationType=ADD), headers={sequenceNumber=3, sequenceDetails=[[36b132ac-7c5b-af66-96c0-2334a757c960, 2, 2]], correlationId=d8506721-2cfd-b6da-d353-4fb8bd5744fb, id=71fa915a-fcaa-3d00-023b-5cf51be3b183, sequenceSize=4, timestamp=1624538014585}]
2021-06-24 15:34:01.333  INFO 56174 --- [lTaskExecutor-4] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=S3FileInfo(fileName=dsfsdf, timeStamp=null, serviceName=null, accountLogin=login1, operationType=ADD), headers={sequenceNumber=4, sequenceDetails=[[36b132ac-7c5b-af66-96c0-2334a757c960, 2, 2]], correlationId=d8506721-2cfd-b6da-d353-4fb8bd5744fb, id=7c513e23-5484-4f61-b7d3-362648c7b89c, sequenceSize=4, timestamp=1624538014585}]

What I want is to have something like this:

[pool-4-thread-1] simultaneously 
[pool-4-thread-2] simultaneously
[pool-4-thread-2] 20 min delay
[pool-4-thread-2] 20 min delay
[pool-4-thread-2] 20 min delay

Solution

  • Your first step is correct about splitting a list of AccountWithFiles. Then you say that you'd like to split s3FileInfoList and process them sequentially, one by one, but why you place them into a QeueuChannel? The regular, default DirectChannel would be enough in this case.

    However then you go to delay() which does not block the current thread, but schedule a task in a future in a separate thread. So, you probably need to rethink your solution since with the current approach, even if you get rid off that bookItemsChannel as a queue, you still are not going to process delayed messages sequentially. There is just no guarantee on TaskScheduler which one is going to be performed first when they all have the same schedule time.