Search code examples
springamazon-s3spring-integrationenterprise-integrationspring-integration-aws

spring-integration-aws dynamic file download


I've a requirement to download a file from S3 based on a message content. In other words, the file to download is previously unknown, I've to search and find it at runtime. S3StreamingMessageSource doesn't seem to be a good fit because:

  1. It relies on polling where as I need to wait for the message.
  2. I can't find any way to create a S3StreamingMessageSource dynamically in the middle of a flow. gateway(IntegrationFlow) looks interesting but what I need is a gateway(Function<Message<?>, IntegrationFlow>) that doesn't exist.

Another candidate is S3MessageHandler but it has no support for listing files which I need for finding the desired file.

I can implement my own message handler using AWS API directly, just wondering if I'm missing something, because this doesn't seem like an unusual requirement. After all, not every app just sits there and keeps polling S3 for new files.


Solution

  • For anyone coming across this question, this is what I did. The trick is to:

    1. Set filters later, not at construction time. Note that there is no addFilters or getFilters method, so filters can only be set once, and can't be added later. @artem-bilan, this is inconvenient.
    2. Call S3StreamingMessageSource.receive manually.

      .handle(String.class, (fileName, h) -> {
      if (messageSource instanceof S3StreamingMessageSource) {
          S3StreamingMessageSource s3StreamingMessageSource = (S3StreamingMessageSource) messageSource;
      
          ChainFileListFilter<S3ObjectSummary> chainFileListFilter = new ChainFileListFilter<>();
          chainFileListFilter.addFilters(
                  new S3SimplePatternFileListFilter("**/*/*.json.gz"),
                  new S3PersistentAcceptOnceFileListFilter(metadataStore, ""),
                  new S3FileListFilter(fileName)
          );
          s3StreamingMessageSource.setFilter(chainFileListFilter);
      
          return s3StreamingMessageSource.receive();
      }
      log.warn("Expected: {} but got: {}.",
              S3StreamingMessageSource.class.getName(), messageSource.getClass().getName());
      return messageSource.receive();
      }, spec -> spec
          .requiresReply(false) // in case all messages got filtered out
      )