I am new to Spring Intergation. I need to process files from sftp server from user1/upload directory and move them to user1/processed drectory after that. My code workes overall ok, but i have two problems:
When i restart my application, directory user1/processed is deleted as well as all files that have been there earlier. I want just to write more files there, not to empty directory every time.
Every time i start my application i receive files (as i see for their names i print to console) i get old files, thet were already moved to processed directory. It seem really weird, because i cannot see these files when connecting to sftp by some other tools like WinSCP. Are that old file list cashed somewhere?
@Value("${cielo.sftp.host}")
private String sftpHost;
@Value("${cielo.sftp.port}")
private int sftpPort;
@Value("${cielo.sftp.user}")
private String sftpUser;
@Value("${cielo.sftp.pass}")
private String sftpPasword;
@Value("${cielo.sftp.remotedir}")
private String sftpRemoteDirectoryDownload;
@Value("${cielo.sftp.localdir}")
private String sftpLocalDirectoryDownload;
@Value("${cielo.sftp.filter}")
private String sftpRemoteDirectoryDownloadFilter;
@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
public SessionFactory<LsEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(sftpHost);
factory.setPort(sftpPort);
factory.setUser(sftpUser);
factory.setPassword(sftpPasword);
factory.setAllowUnknownKeys(true); //Set to true to allow connections to hosts with unknown (or changed) keys. Its default is 'false'. If false, a pre-populated knownHosts file is required.
return new CachingSessionFactory<>(factory);
}
@Bean
@Order(Ordered.HIGHEST_PRECEDENCE - 1)
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer(final SessionFactory<LsEntry> sftpSessionFactory) {
SftpInboundFileSynchronizer fileSynchronizer =
new SftpInboundFileSynchronizer(sftpSessionFactory);
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory(sftpRemoteDirectoryDownload);
fileSynchronizer
.setFilter(new SftpSimplePatternFileListFilter(sftpRemoteDirectoryDownloadFilter)); //todo maybe use RegexPatternFileListFilter?
return fileSynchronizer;
}
@Bean
@Order(Ordered.HIGHEST_PRECEDENCE - 2)
@InboundChannelAdapter(channel = "fromSftpChannel", poller = @Poller(fixedDelay = "1000"))
//@InboundChannelAdapter(channel = "fromSftpChannel", poller = @Poller(cron = "${cielo.sftp.poller.cron}"))
public MessageSource<File> sftpMessageSource(final SftpInboundFileSynchronizer sftpInboundFileSynchronizer) {
SftpInboundFileSynchronizingMessageSource source =
new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer);
source.setLocalDirectory(new File("/tmp/local"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
return source;
}
@Bean
@ServiceActivator(
inputChannel = "fromSftpChannel")
public MessageHandler resultFileHandler() {
return new MessageHandler() {
@Override
public void handleMessage(final Message<?> message) throws MessagingException {
String payload = String.valueOf(message.getPayload());
System.err.println(payload);
}
};
}
private static final SpelExpressionParser PARSER = new SpelExpressionParser();
@Bean(name="fromSftpChannel")
public MessageChannel fromSftpChannel() {
return new PublishSubscribeChannel();
}
@Bean
@ServiceActivator(inputChannel = "fromSftpChannel")
@Order(Ordered.LOWEST_PRECEDENCE)
public MessageHandler moveFile() {
SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(), AbstractRemoteFileOutboundGateway.Command.MV.getCommand(), "'/user1/upload/'.concat(" + PARSER.parseExpression("payload.getName()").getExpressionString() + ")");
sftpOutboundGateway.setRenameExpressionString("'/user1/processed/'.concat(" + PARSER.parseExpression("payload.getName()").getExpressionString() + ")");
sftpOutboundGateway.setRequiresReply(false);
sftpOutboundGateway.setOutputChannelName("nullChannel");
sftpOutboundGateway.setOrder(Ordered.LOWEST_PRECEDENCE);
sftpOutboundGateway.setAsync(true);
return sftpOutboundGateway;
}
Thank for any help!
I looked at example at Spring integration - SFTP rename or move file in remote server after copying and that helped me a lot
I also checked official spring intergation sftp documentation
I'm not sure how the remote directory is cleared up on start up. Need to debug that behavior on your side. But I can say you why you see old files. You do remote rename after process, but local copies of those files are still stored on the source.setLocalDirectory(new File("/tmp/local"));
. Consider to clean up it when you done with renaming or on restart.
You also may look into an SftpStreamingMessageSource
instead for your logic: https://docs.spring.io/spring-integration/reference/sftp/streaming.html