I am currently processing .csv files from a remote SFTP server, transforming them and placing them on a Kafka topic. I know we are able to delete the files after it is processed using an Sftp.inboundAdapter
. I was curious if there was a way we can get this functionality with the Sftp.InboundStreamingAdapter
? Is there another flow I would need to add in, or is there some way I can delete using the properties provided?
I can provide more code if the entire flow if needed. Thanks
@Bean
public IntegrationFlow sftpFileTransferFlow(SessionFactory<SftpClient.DirEntry> sftpSessionFactory,
IntegrationFlowProperties properties,
MessageChannel inboundFilesMessageChannel) {
return IntegrationFlow
.from(Sftp.inboundStreamingAdapter(new RemoteFileTemplate<>(sftpSessionFactory))
.filter(new SftpRegexPatternFileListFilter(properties.getRemoteFilePattern()))
.remoteDirectory(properties.getRemoteDirectory()),
e -> e.id("sftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedRate(5000)))
.log(LoggingHandler.Level.DEBUG, "DataSftpToKafkaIntegrationFlow",
"headers['file_remoteDirectory'] + + T(java.io.File).separator + headers['file_remoteFile']")
.channel(inboundFilesMessageChannel)
.get();
}
EDIT:
@Bean
public IntegrationFlow readCsvFileFlow(MessageChannel inboundFilesMessageChannel,
QueueChannel kafkaPojoMessageChannel) {
return IntegrationFlow.from(inboundFilesMessageChannel)
.split(Files.splitter()
.markers(true)
.charset(StandardCharsets.UTF_8)
.firstLineAsHeader("myHeaders")
.applySequence(true))
// .transform(new StreamToEmailInteractionConverter()) // TODO: Figure this out so it doesn't get put on topic as .csv
.transform(new ObjectToJsonTransformer())
.log(LoggingHandler.Level.DEBUG,
"DataSftpToKafkaIntegrationFlow",
m -> "Payload: " + m.getPayload())
.channel(kafkaPojoMessageChannel)
.get();
}
@Bean
public IntegrationFlow publishToKafkaFlow(KafkaTemplate<String, String> kafkaTemplate,
MessageChannel kafkaProducerErrorRecordChannel,
QueueChannel kafkaPojoMessageChannel) {
return IntegrationFlow.from(kafkaPojoMessageChannel)
.log(LoggingHandler.Level.DEBUG,
"DataSftpToKafkaIntegrationFlow", e -> "Payload: " + e.getPayload())
.handle(Kafka.outboundChannelAdapter(kafkaTemplate)
.topic(KAFKA_TOPIC),
e -> e.id("KafkaProducer"))
.routeByException(r -> r
.channelMapping(KafkaProducerException.class, kafkaProducerErrorRecordChannel)
.defaultOutputChannel("errorChannel"))
.get();
}
After this entire flow is completed I would like to delete the file we just processed from the remote SFTP server.
The AbstractInboundFileSynchronizingMessageSource
works in two phases: Copy remote files into local dir, and then emit messages for those local files. The deleteRemoteFiles
option is for copying from remote to local phase. That's where we don't need remote files any more and we deal only with local copies downstream.
The streaming source opens an InputStream
for remote files and keeps it that way until you close it manually. That's why we don't have a explicit option to delete remote files.
You can perform remote file removal using an SftpOutboundGateway
with an rm
command. This gateway could be as a second subscriber to that inboundFilesMessageChannel
when you make it as a PublishSubscribeChannel
and when you have closed already that IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE
from message headers.
However I think we can come up with an option like removeFileOnClose
on the AbstractRemoteFileStreamingMessageSource
and and perform rm
when IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE
is closed.