First of all you can find my code at https://github.com/keiseithunder/spring-sftp-xml-to-json/blob/main/src/main/java/com/demo/sftp/SftpApplication.java
I have a spring integration that read file from SFTP server into a stream and send to kafka and then trying to move a remote file to other directory in the same remote server after succesfully produce the message. But, the file doesn't move at all and didn't throw any error.
I have try using this code to move a file from upload/test.xml
(the file in upload/file/test.xml
is a backup that I use to copy to upload
directory) to upload/processed/test.xml
.
@Bean
@ServiceActivator(inputChannel = "success")
public MessageHandler handler() {
return new SftpOutboundGateway(sftpSessionFactory(), "mv", "");
}
I already set file_renameTo=upload/processed/test.xml
in the headers
. Not sure, what I do wrong. Or there is a way to use something like advice.setOnSuccessExpressionString("@template.copy(headers['file_remoteDirectory']+'/'+headers['file_remoteFile'])");
to move a file?
My Message is
"GenericMessage [payload=Note(to=Toves, from=Jani, heading=Reminder, body=Don't forget me this weekend!!!!!), headers={file_remoteHostPort=localhost:2222, file_remoteFileInfo={"directory":false,"filename":"test.xml","link":false,"modified":1674550080000,"permissions":"rw-r--r--","remoteDirectory":"upload","size":122}, kafka_messageKey=test.xml, file_remoteDirectory=upload, kafka_recordMetadata=test-0@298, file_renameTo=upload/processed/test.xml, id=708d04c4-5abc-9f45-e83b-1fea7ffa5e8d, closeableResource=org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession@2e0aa05, file_remoteFile=test.xml, timestamp=1674556856288}]"
PS. I have try using debugger to see what wrong and found that it seem to raise an error at
private String obtainRemoteFilePath(Message<?> requestMessage) {
Error here----> String remoteFilePath = this.fileNameProcessor.processMessage(requestMessage);
Assert.state(remoteFilePath != null,
() -> "The 'fileNameProcessor' evaluated to null 'remoteFilePath' from message: " + requestMessage);
return remoteFilePath;
}
"class com.demo.sftp.models.Note cannot be cast to class java.lang.String (com.demo.sftp.models.Note is in unnamed module of loader org.springframework.boot.devtools.restart.classloader.RestartClassLoader @387f9ed2; java.lang.String is in module java.base of loader 'bootstrap')"
EDIT 1: Add solution
@Bean
public MessageChannel streamChannel() {
return new PublishSubscribeChannel();
}
@Bean
@Order(Ordered.LOWEST_PRECEDENCE)
@ServiceActivator(inputChannel = "streamChannel")
public MessageHandler moveFile() {
SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(), Command.MV.getCommand(),
"headers['file_remoteDirectory'] + '/' + headers['file_remoteFile']");
sftpOutboundGateway
.setRenameExpressionString(
"headers['file_remoteDirectory'] + '/processed/' +headers['timestamp'] + '-' + headers['file_remoteFile']");
sftpOutboundGateway.setRequiresReply(false);
sftpOutboundGateway.setUseTemporaryFileName(true);
sftpOutboundGateway.setOutputChannelName("nullChannel");
sftpOutboundGateway.setOrder(Ordered.LOWEST_PRECEDENCE);
sftpOutboundGateway.setAsync(true);
return sftpOutboundGateway;
}
setRenameExpressionString
to match the ErrorMessage
structure. Eg. @Bean
@Order(Ordered.LOWEST_PRECEDENCE)
@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
public MessageHandler moveErrorFile() {
SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(), Command.MV.getCommand(),
"payload['failedMessage']['headers']['file_remoteDirectory'] + '/' + payload['failedMessage']['headers']['file_remoteFile']");
sftpOutboundGateway
.setRenameExpressionString(
"payload['failedMessage']['headers']['file_remoteDirectory'] + '/error/' + payload['failedMessage']['headers']['timestamp'] + '-' + payload['failedMessage']['headers']['file_remoteFile']");
sftpOutboundGateway.setRequiresReply(false);
sftpOutboundGateway.setUseTemporaryFileName(true);
sftpOutboundGateway.setOutputChannelName("nullChannel");
sftpOutboundGateway.setOrder(Ordered.HIGHEST_PRECEDENCE);
sftpOutboundGateway.setAsync(true);
return sftpOutboundGateway;
}
As artem-bilan answer in Spring integration - SFTP rename or move file in remote server after copying
I can solve the issue by change my InboundChannelAdapter
to PublishSubscribeChannel
and create new subscriber like below
@Bean
@Order(Ordered.LOWEST_PRECEDENCE)
@ServiceActivator(inputChannel = "streamChannel")
public MessageHandler moveFile() {
SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(), Command.MV.getCommand(),
"headers['file_remoteDirectory'] + '/' + headers['file_remoteFile']");
sftpOutboundGateway
.setRenameExpressionString("headers['file_remoteDirectory'] + '/processed/' +headers['timestamp'] + '-' + headers['file_remoteFile']");
sftpOutboundGateway.setRequiresReply(false);
sftpOutboundGateway.setUseTemporaryFileName(true);
sftpOutboundGateway.setOutputChannelName("nullChannel");
sftpOutboundGateway.setOrder(Ordered.LOWEST_PRECEDENCE);
sftpOutboundGateway.setAsync(true);
return sftpOutboundGateway;
}