Search code examples
javaspringspring-integrationspring-integration-dslspring-integration-sftp

How to stop an IntegrationFlow when it parses all the files on SFTP server, and re-run it again next day with Scheduled function


I have a StandardIntegrationFlow which reads some files from a bunch of folders on an SFTP server, filters and then process the ones that pass the filter. The structure of my files are like this: /home/{userId}/files/...

@Bean
  public StandardIntegrationFlow readingFilesAndProcess(
      SessionFactory<ChannelSftp.LsEntry> sessionFactory,
      PollerMetadata sftpStreamingPoller,
      RecursiveSftpRemoteFileTemplate recursiveReceiptsRemoteFileTemplate,
      FileeValidator fileValidator) {
    return IntegrationFlows.from(
            Sftp.inboundStreamingAdapter(recursiveReceiptsRemoteFileTemplate)
                .remoteDirectory("/home"),
            e -> e.poller(sftpStreamingPoller).autoStartup(false))
        .filter(Message.class, fileValidator::isFileValid)
        .handle(
            message -> {
              log.info("Handling message at: {}", LocalDateTime.now());
              handlingFile(message);
            })
        .get();
  }

I have created a recursive RemoteFileTemplate and I can successfully parse all the files for all the users.

@Bean
  public RecursiveSftpRemoteFileTemplate recursiveReceiptsRemoteFileTemplate(
      SessionFactory<ChannelSftp.LsEntry> sessionFactory,
      SftpPersistentAcceptOnceFileListFilter sftpPersistentAcceptOnceFileListFilter) {
    final RecursiveSftpRemoteFileTemplate template =
        new RecursiveSftpRemoteFileTemplate(
            sessionFactory,
            1,
            file -> sftpPersistentAcceptOnceFileListFilter.accept(file),
            List.of("files"));
    template.setRemoteDirectoryExpression(new LiteralExpression("/home"));
    return template;
  }

To trigger the integration flow I am using this function:

@Scheduled(cron = "0 0 0 * * *", zone = "ECT")
    public void triggeringFlow(){
        log.info("Triggering the IntegrationFlow");
        readingFilesAndProcess.start();
    }

My issue is that the first time that the readingFilesAndProcess flow is triggered parses all the files, but the second, third, ... time it doesn't do anything. (checked logs).

Is there a way to "stop" the flow when it parses all the files, so the next time that the flow will be triggered it will go through all the files again?

I am using spring-integration 5.5.15


Solution

  • You don't need that @Scheduled approach and you don't need to have your Inbound Channel Adapter to be stopped or restarted. There is just enough to have that poller to be configured for a respective cron expression to be run once a day.