I have implemented a service of getting a file from, putting a file to, and removing a file from the SFTP server based on the SftpRemoteFileTemplate within Spring's Integration Package.
Here sftpGetPayload gets a file from the SFTP server and delivers its content.
This is my code so far:
public String sftpGetPayload(String sessionId,
String host, int port, String user, String password,
String remoteDir, String remoteFilename, boolean remoteRemove) {
LOG.info("sftpGetPayload sessionId={}", sessionId);
LOG.debug("sftpGetPayLoad host={}, port={}, user={}", host, port, user);
LOG.debug("sftpGetPayload remoteDir={}, remoteFilename={}, remoteRemove={}",
remoteDir, remoteFilename, remoteRemove);
final AtomicReference<String> refPayload = new AtomicReference<>();
SftpRemoteFileTemplate template = getSftpRemoteFileTemplate(host, port,
user, password, remoteDir, remoteFilename);
template.get(remoteDir + "/" + remoteFilename,
is -> refPayload.set(getAsString(is)));
LOG.info("sftpGetToFile {} read.", remoteDir + "/" + remoteFilename);
deleteRemoteFile(template, remoteDir, remoteFilename, remoteRemove);
return refPayload.get();
}
private SftpRemoteFileTemplate getSftpRemoteFileTemplate(String host, int port,
String user, String password, String remoteDir, String remoteFilename) {
SftpRemoteFileTemplate template =
new SftpRemoteFileTemplate(sftpSessionFactory(host, port, user, password));
template.setFileNameExpression(
new LiteralExpression(remoteDir + "/" + remoteFilename));
template.setRemoteDirectoryExpression(new LiteralExpression(remoteDir));
//template.afterPropertiesSet();
return template;
}
private void deleteRemoteFile(SftpRemoteFileTemplate template,
String remoteDir, String remoteFilename, boolean remoteRemove) {
LOG.debug("deleteRemoteFile remoteRemove={}", remoteRemove);
if (remoteRemove) {
template.remove(remoteDir + "/" + remoteFilename);
LOG.info("sftpGetToFile {} removed.", remoteDir + "/" + remoteFilename);
}
}
All those GET actions are active actions, meaning the file to get is considered to be already there. I would like to have a kind of a polling process, which calls my payload consuming method as soon as a file is received on the SFTP server.
I have found another implementation based on Spring beans, configured as Spring Integration Dsl, which declares a SftpSessionFactory, a SftpInboundFileSynchronizer, a SftpMessageSource, and a MessageHandler which polls a SFTP site for reception of a file and initiates a message handler automatically for further processing.
This code is as follows:
@Bean
public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(myHost);
factory.setPort(myPort);
factory.setUser(myUser);
factory.setPassword(myPassword);
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<LsEntry>(factory);
}
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory(myRemotePath);
fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter(myFileFilter));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> sftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source = new SftpInboundFileSynchronizingMessageSource(
sftpInboundFileSynchronizer());
source.setLocalDirectory(myLocalDirectory);
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
return source;
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
How can I include this @Poller/MessageHandler/@ServiceActivator concept into my implementation above? Or is there a way to implement this feature in the template-based implementation?
The scenario could be following:
I have a Spring Boot Application with several classes which represent tasks. Some of those tasks are called automatically via the Spring @Scheduled annotation and a CRON specification, others are not.
@Scheduled(cron = "${task.to.start.automatically.frequency}")
public void runAsTask() {
...
}
First task will start at ist @Sheduled specification and get a file from SFTP server and process it. It will do that with its own channel (host1, port1, user1, password1, remoteDir1, remoteFile1). Second task will also be run by the scheduler and generate something to put to the SFTP server. It will do that with its own channel (host2, port2, user2, password2, remoteDir2, remoteFile2). Very likely will host2 = host1 and port2 = port1, but it is not a must. Third task will aslo be run by the scheduler and generate something to put to the SFTP server. It will do that with the same channel as task1, but this task is a producer (not a consumer like task1) and will write another file than task1 (host1, port1, user1, password1, remoteDir3, remoteFile3). Task four has no @Scheduled annotation because it should realize when the file, it has to process, is received from third party and hence available on its channel (host4, port4, user4, password4, remoteDir4, remoteFile4) to get its content to process it.
I have read the whole Integration stuff, but it is hard to transform for this use case, either from the XML configuration schemes to Java configuration with annotations and also by the reather static Spring bean approach to a merly dynamic approach at runtime.
I understood to use an IntegrationFlow to register the artefacts, an inbound adapter for task1, an outbound adapter for task2, an inbound adapter for task3 with the same (anywhere else registrated) session factory of task1, and - last but not least - an inbound adapter with poller feature for task4. Or should all of them be gateways with its command feature? Or should I register SftpRemoteFileTemplate?
To define the channel I have:
public class TransferChannel {
private String host;
private int port;
private String user;
private String password;
/* getters, setters, hash, equals, and toString */
}
To have all SFTP settings together, I have:
public class TransferContext {
private boolean enabled;
private TransferChannel channel;
private String remoteDir;
private String remoteFilename;
private boolean remoteRemove;
private String remoteFilenameFilter;
private String localDir;
/* getters, setters, hash, equals, and toString */
}
As the heart of the SFTP processing each job will inject kind of a DynamicSftpAdapter:
@Scheduled(cron = "${task.to.start.automatically.frequency}")
public void runAsTask() {
@Autowired
DynamicSftpAdapter sftp;
...
sftp.connect("Task1", context);
File f = sftp.getFile("Task1", "remoteDir", "remoteFile");
/* process file content */
sftp.removeFile("Task1", "remoteDir", "remoteFile");
sftp.disconnect("Task1", context);
}
The DynamicSftpAdapter is not much more than a fragment yet:
@Component
public class DynamicSftpAdapter {
private static final Logger LOG = LoggerFactory.getLogger(DynamicTcpServer.class);
@Autowired
private IntegrationFlowContext flowContext;
@Autowired
private ApplicationContext appContext;
private final Map<TransferChannel, IntegrationFlowRegistration> registrations = new HashMap<>();
private final Map<String, TransferContext> sessions = new ConcurrentHashMap<>();
@Override
public void connect(String sessionId, TransferContext context) {
if (this.registrations.containsKey(context.getChannel())) {
LOG.debug("connect, channel exists for {}", sessionId);
}
else {
// register the required SFTP Outbound Adapter
TransferChannel channel = context.getChannel();
IntegrationFlow flow = f -> f.handle(Sftp.outboundAdapter(cashedSftpSessionFactory(
channel.getHost(), channel.getPort(),
channel.getUser(), channel.getPassword())));
this.registrations.put(channel, flowContext.registration(flow).register());
this.sessions.put(sessionId, context);
LOG.info("sftp session {} for {} started", sessionId, context);
}
}
private DefaultSftpSessionFactory sftpSessionFactory(String host, int port, String user, String password) {
LOG.debug("sftpSessionFactory");
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(host);
factory.setPort(port);
factory.setUser(user);
factory.setPassword(password);
factory.setAllowUnknownKeys(true);
return factory;
}
private CachingSessionFactory<LsEntry> cashedSftpSessionFactory(String host, int port, String user, String password) {
LOG.debug("cashedSftpSessionFactory");
CachingSessionFactory<LsEntry> cashedSessionFactory =
new CachingSessionFactory<LsEntry>(
sftpSessionFactory(host, port, user, password));
return cashedSessionFactory;
}
@Override
public void sftpGetFile(String sessionId, String remoteDir, String remoteFilename) {
TransferContext context = sessions.get(sessionId);
if (context == null)
throw new IllegalStateException("Session not established, sessionId " + sessionId);
IntegrationFlowRegistration register = registrations.get(context.getChannel());
if (register != null) {
try {
LOG.debug("sftpGetFile get file {}", remoteDir + "/" + remoteFilename);
register.getMessagingTemplate().send(
MessageBuilder.withPayload(msg)
.setHeader(...).build());
}
catch (Exception e) {
appContext.getBean(context, DefaultSftpSessionFactory.class)
.close();
}
}
}
@Override
public void disconnect(String sessionId, TransferContext context) {
IntegrationFlowRegistration registration = this.registrations.remove(context.getChannel());
if (registration != null) {
registration.destroy();
}
LOG.info("sftp session for {} finished", context);
}
}
I did not get how to initiate a SFTP command. I also did not get when using an OutboundGateway and having to specify the SFTP command (like GET) instantly, then would the whole SFTP handling be in one method, specifying the outbound gateway factory and getting an instance with get() and probably calling the message .get() in any way.
Obviously I need help.
First of all if you already use Spring Integration channel adapters, there is probably no reason to use that low-level API like RemoteFileTemplate
directly.
Secondly there is a technical discrepancy: the SftpInboundFileSynchronizingMessageSource
will produce a local file - a whole copy of the remote file. So, when we would come to your SftpRemoteFileTemplate
logic downstream it would not work well since we would bring already just a local file (java.io.File
), not an entity for remote file representation.
Even if your logic in the sftpGetPayload()
doesn't look as complicated and custom as it would require such a separate method, it is better to have an SftpRemoteFileTemplate
as a singleton and share it between different components when you work against the same SFTP server. It is just stateless straightforward Spring template pattern implementation.
If you still insist to use your method from the mentioned integration flow, you should consider to have a POJO method call for that @ServiceActivator(inputChannel = "sftpChannel")
. See more in docs: https://docs.spring.io/spring-integration/docs/current/reference/html/configuration.html#annotations.
You also may find an SFTP Outbound Gateway as useful component for your use-case. It has some common scenarios implementations: https://docs.spring.io/spring-integration/docs/current/reference/html/sftp.html#sftp-outbound-gateway