I'd like to stream data in couple of goes to the sftp server. I'm using spring boot integration. I've setted up SftpRemoteFileTemplate like this
@Autowired private SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory;
@Bean
public SftpRemoteFileTemplate sftpRemoteFileTemplate() {
final SftpRemoteFileTemplate template = new SftpRemoteFileTemplate(sftpSessionFactory);
template.setRemoteDirectoryExpression(
new LiteralExpression(contactSenderSftpProperties.getSftpSessionProperties().getBaseSftpPath()));
template.setTemporaryFileSuffix(".tmp");
return template;
}
but my destination file is not appended instead overwritten by the latest contents of temporary file I'm sending data to.
My writer looks like this
public void write(List<? extends Item> items) throws Exception {
log.debug("Write {}", items);
final int timeoutSeconds = 60;
try (PipedInputStream pipedInputStream = new PipedInputStream()) {
log.debug("Preparing to write...");
final CountDownLatch countDownLatch = new CountDownLatch(1);
writerClient.write(items, pipedInputStream, countDownLatch);
if (!countDownLatch.await(timeoutSeconds, TimeUnit.SECONDS)) {
throw new TimeoutException("Operation stream not connected");
}
sftpRemoteFileTemplate.send(
MessageBuilder.withPayload(pipedInputStream).setHeader(FileHeaders.FILENAME, "contacts.csv").build());
}
}
where method WriterClient#write
@Async("writerThreadPoolTaskExecutor")
public void write(List<? extends Item> items, PipedInputStream pipedInputStream, CountDownLatch countDownLatch) throws IOException {
try(final PipedOutputStream pipedOutputStream = new PipedOutputStream(pipedInputStream)){
countDownLatch.countDown();
csvSerializer.serialize(pipedOutputStream, items.stream());
}
}
and writerThreadPoolTaskExecutor
@Bean
public ThreadPoolTaskExecutor writerThreadPoolTaskExecutor(TaskExecutorBuilder taskExecutorBuilder) {
return taskExecutorBuilder
.corePoolSize(properties.getWriterThreadPoolCorePoolSize())
.maxPoolSize(properties.getWriterThreadPoolMaxPoolSize())
.queueCapacity(properties.getWriterThreadPoolQueueCapacity())
.threadNamePrefix("writer-task-thread")
.build();
}
In nutshell I'd like to write many small temporary files and merge them to one containing all the data. I'm not really sure about PipedInput/OutputStream. Is it possible to append PipedOutputStream many times and upload PipedInputStream only once to sftp when there is no more data to write ? But then question arises how do I know if all the date was written ?
Take a look into an SFTP Outbound Channel Adapter and its APPEND
mode: https://docs.spring.io/spring-integration/docs/current/reference/html/sftp.html#spel-and-the-sftp-outbound-adapter