Search code examples
javaspring-bootspring-integration

PipedOutput/InputStream upload to sftp using spring integration


I'd like to stream data in couple of goes to the sftp server. I'm using spring boot integration. I've setted up SftpRemoteFileTemplate like this

  @Autowired private SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory;

  @Bean
  public SftpRemoteFileTemplate sftpRemoteFileTemplate() {
    final SftpRemoteFileTemplate template = new SftpRemoteFileTemplate(sftpSessionFactory);

    template.setRemoteDirectoryExpression(
            new LiteralExpression(contactSenderSftpProperties.getSftpSessionProperties().getBaseSftpPath()));

    template.setTemporaryFileSuffix(".tmp");

    return template;
  }

but my destination file is not appended instead overwritten by the latest contents of temporary file I'm sending data to.

My writer looks like this

  public void write(List<? extends Item> items) throws Exception {
    log.debug("Write {}", items);

    final int timeoutSeconds = 60;

    try (PipedInputStream pipedInputStream = new PipedInputStream()) {
      log.debug("Preparing to write...");
      final CountDownLatch countDownLatch = new CountDownLatch(1);

      writerClient.write(items, pipedInputStream, countDownLatch);

      if (!countDownLatch.await(timeoutSeconds, TimeUnit.SECONDS)) {
        throw new TimeoutException("Operation stream not connected");
      }

      sftpRemoteFileTemplate.send(
              MessageBuilder.withPayload(pipedInputStream).setHeader(FileHeaders.FILENAME, "contacts.csv").build());

    }
  }

where method WriterClient#write

  @Async("writerThreadPoolTaskExecutor")
  public void write(List<? extends Item> items, PipedInputStream pipedInputStream, CountDownLatch countDownLatch) throws IOException {
    try(final PipedOutputStream pipedOutputStream = new PipedOutputStream(pipedInputStream)){
      countDownLatch.countDown();
      csvSerializer.serialize(pipedOutputStream, items.stream());
    }
  }

and writerThreadPoolTaskExecutor

  @Bean
  public ThreadPoolTaskExecutor writerThreadPoolTaskExecutor(TaskExecutorBuilder taskExecutorBuilder) {
    return taskExecutorBuilder
            .corePoolSize(properties.getWriterThreadPoolCorePoolSize())
            .maxPoolSize(properties.getWriterThreadPoolMaxPoolSize())
            .queueCapacity(properties.getWriterThreadPoolQueueCapacity())
            .threadNamePrefix("writer-task-thread")
            .build();
  }

In nutshell I'd like to write many small temporary files and merge them to one containing all the data. I'm not really sure about PipedInput/OutputStream. Is it possible to append PipedOutputStream many times and upload PipedInputStream only once to sftp when there is no more data to write ? But then question arises how do I know if all the date was written ?


Solution

  • Take a look into an SFTP Outbound Channel Adapter and its APPEND mode: https://docs.spring.io/spring-integration/docs/current/reference/html/sftp.html#spel-and-the-sftp-outbound-adapter