I have a Spring SFTP output adapter that I start via "adapter.start()" in my main program. Once started, the adapter transfers and uploads all the files in the specified directory as expected. But I want to stop the adapter after all the files have been transferred. How do I detect if all the files have been transferred so I can issue an adapter.stop()?
@Bean
public IntegrationFlow sftpOutboundFlow() {
return IntegrationFlows.from(Files.inboundAdapter(new File(sftpOutboundDirectory))
.filterExpression("name.endsWith('.pdf') OR name.endsWith('.PDF')")
.preventDuplicates(true),
e -> e.id("sftpOutboundAdapter")
.autoStartup(false)
.poller(Pollers.trigger(new FireOnceTrigger())
.maxMessagesPerPoll(-1)))
.log(LoggingHandler.Level.INFO, "sftp.outbound", m -> m.getPayload())
.log(LoggingHandler.Level.INFO, "sftp.outbound", m -> m.getHeaders())
.handle(Sftp.outboundAdapter(outboundSftpSessionFactory())
.useTemporaryFileName(false)
.remoteDirectory(sftpRemoteDirectory))
.get();
}
@Artem Bilan has already given the answer. But here's kind of a concrete implementation of what he said - for those who are a Spring Integration noob like me:
@Service
public class MyFileService {
public List<File> getPdfFiles(final String srcDir) {
File[] files = new File(srcDir).listFiles((dir, name) -> name.toLowerCase().endsWith(".pdf"));
return Arrays.asList(files == null ? new File[]{} : files);
}
}
@MessagingGateway
public interface SFtpOutboundGateway {
@Gateway(requestChannel = "sftpOutboundFlow.input")
void uploadFiles(List<File> files);
}
Sftp.outboundGateway
:@Configuration
@EnableIntegration
public class FtpFlowIntegrationConfig {
// could be also bound via @Value
private String sftpRemoteDirectory = "/path/to/remote/dir";
@Bean
public SessionFactory<ChannelSftp.LsEntry> outboundSftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost("localhost");
factory.setPort(22222);
factory.setUser("client1");
factory.setPassword("password123");
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<>(factory);
}
@Bean
public IntegrationFlow sftpOutboundFlow(RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate) {
return e -> e
.log(LoggingHandler.Level.INFO, "sftp.outbound", Message::getPayload)
.log(LoggingHandler.Level.INFO, "sftp.outbound", Message::getHeaders)
.handle(
Sftp.outboundGateway(remoteFileTemplate, AbstractRemoteFileOutboundGateway.Command.MPUT, "payload")
);
}
@Bean
public RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate(SessionFactory<ChannelSftp.LsEntry> outboundSftpSessionFactory) {
RemoteFileTemplate<ChannelSftp.LsEntry> template = new SftpRemoteFileTemplate(outboundSftpSessionFactory);
template.setRemoteDirectoryExpression(new LiteralExpression(sftpRemoteDirectory));
template.setAutoCreateDirectory(true);
template.afterPropertiesSet();
template.setUseTemporaryFileName(false);
return template;
}
}
Wiring up:
public class SpringApp {
public static void main(String[] args) {
final MyFileService fileService = ctx.getBean(MyFileService.class);
final SFtpOutboundGateway sFtpOutboundGateway = ctx.getBean(SFtpOutboundGateway.class);
// trigger the sftp upload flow manually - only once
sFtpOutboundGateway.uploadFiles(fileService.getPdfFiles());
}
}
Import notes:
1.
@Gateway(requestChannel = "sftpOutboundFlow.input") void uploadFiles(List files);
Here the DirectChannel channel sftpOutboundFlow.input
will be used to pass message with the payload (= List<File> files
) to the receiver. If this channel is not created yet, the Gateway is going to create it implicitly.
2.
@Bean public IntegrationFlow sftpOutboundFlow(RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate) { ... }
Since IntegrationFlow is a Consumer
functional interface, we can simplify the flow a little using the IntegrationFlowDefinition. During the bean registration phase, the IntegrationFlowBeanPostProcessor converts this inline (Lambda) IntegrationFlow to a StandardIntegrationFlow and processes its components. An IntegrationFlow definition using a Lambda populates DirectChannel as an inputChannel of the flow and it is registered in the application context as a bean with the name sftpOutboundFlow.input
in the sample above (flow bean name + ".input"). That's why we use that name for the SFtpOutboundGateway
gateway.
Ref: https://spring.io/blog/2014/11/25/spring-integration-java-dsl-line-by-line-tutorial
3.
@Bean public RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate(SessionFactory<ChannelSftp.LsEntry> outboundSftpSessionFactory) {}
see: Remote directory for sftp outbound gateway with DSL
Flowchart: