I want to upload all files from the local folder ~/sftp-outbound/Export
to a SFTP server. The folder contains two files:
I am doing it with the MPUT
command of a SftpOutboundGateway
within a Flow/Subflow DSL style (actually there are more gateway methods which I have removed for better focus and readability).
My configuration is this:
@Bean
public TransferChannel myChannel() {
LOG.debug("myChannel");
TransferChannel channel = new TransferChannel();
channel.setHost(myEnv.getSftpHost());
channel.setPort(myEnv.getSftpPort());
channel.setUser(myEnv.getSftpUser());
channel.setPrivateKey(myEnv.getSftpPrivateKey());
channel.setPassword(myEnv.getSftpPassword());
return channel;
}
@Bean
public TransferContext myContext(TransferChannel myChannel) {
LOG.debug("myContext");
TransferContext context = new TransferContext();
context.setEnabled(env.isEnabled());
context.setChannel(myChannel);
context.setPreserveTimestamp(true);
context.setLocalDir(env.getLocalDir());
context.setLocalFilename(env.getLocalFilename());
context.setRemoteDir(env.getRemoteDir());
return context;
}
@Bean
public SessionFactory<LsEntry> myFactory(TransferChannel myChannel) {
LOG.debug("myFactory");
DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
sf.setHost(myChannel.getHost());
sf.setPort(myChannel.getPort());
sf.setUser(myChannel.getUser());
if (myChannel.getPrivateKey() != null) {
sf.setPrivateKey(myChannel.getPrivateKey());
} else {
sf.setPassword(myChannel.getPassword());
}
sf.setAllowUnknownKeys(true);
return new CachingSessionFactory<LsEntry>(sf);
}
@Bean
public IntegrationFlow myFlow(SessionFactory<LsEntry> myFactory, TransferContext myContext) {
LOG.debug("myFlow");
return IntegrationFlows.from(myGateway.class, g -> g
.header("method", args -> args.getMethod().getName()))
.log()
.route(Message.class, m -> m.getHeaders().get("method", String.class),
r -> r
.subFlowMapping("mput", f2 -> f2
.handle(Sftp.outboundGateway(
remoteFileTemplate(myFactory,
new SpelExpressionParser().parseExpression(
"headers['" + FileHeaders.REMOTE_DIRECTORY + "']")),
Command.MPUT, "payload"))
))
.get();
}
@Bean
public RemoteFileTemplate<LsEntry> remoteFileTemplate(SessionFactory<LsEntry> sessionFactory,
Expression directory) {
RemoteFileTemplate<LsEntry> template = new SftpRemoteFileTemplate(sessionFactory);
template.setRemoteDirectoryExpression(directory);
template.setAutoCreateDirectory(false);
template.afterPropertiesSet();
return template;
}
public interface MyGateway {
List<String> mput(String localDir,
@Header(FileHeaders.REMOTE_DIRECTORY) String remoteDirectory);
}
The call of the MyGateway
method is:
@Autowired
private MyGateway gate;
...
String localFilename = "~/sftp-outbound/Export";
LOG.debug("runAsTask mput={}", localFilename);
jobLOG.info("put files to SFTP: {}", localFilename);
List<String> result = gate.mput(localFilename, env.getRemoteDir());
LOG.debug("runAsTask, files transferred, result={}", result);
It comes with the following LOG output:
2022-10-21 00:02:00.000 INFO [] --- [pool-125-thread-1] job-ExportData : put files to SFTP: ~/sftp-outbound/Export
2022-10-21 00:02:00.001 INFO [] --- [pool-125-thread-1] o.s.integration.handler.LoggingHandler : GenericMessage [payload=~/cdb/sftp-outbound/Export, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ce6c62d4, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ce6c62d4, id=02f7972e-ef06-2297-b1dd-fed5cd75a2d2, method=mput, file_remoteDirectory=data/export, timestamp=1666303320001}]
2022-10-21 00:02:00.008 DEBUG [] --- [pool-125-thread-1] .l.c.c.j.ExportDataTask : runAsTask, files transferred, result=[data/export/02f7972e-ef06-2297-b1dd-fed5cd75a2d2.msg]
One sees two things:
02f7972e-ef06-2297-b1dd-fed5cd75a2d2.msg
instead of
foo1.TEST.txt
or foo2.TEST.txt
What am I doing wrong?
EDIT 1
Trying to find the problem I slightly changed the Integration Flow configuration, setting the remote directory inline to make the RemoteFileTemplate bean obsolete:
.subFlowMapping("mput", f2 -> f2
.handle(Sftp.outboundGateway(myFactory, Command.MPUT, "payload")
.autoCreateDirectory(false)
.remoteDirectoryExpression(myContext.getRemoteDir()))
Following your answer, Artem, I changed the SftpOutboundGateway method's parameter to type java.io.File
as follows:
public interface MyGateway {
List<String> mput(File localDir);
}
The mput
gateway method call is now:
List<String> result = gate.mput(new File(localFilename));
But this did not resolve my problem. Now I get the following error:
.subFlowMapping("mput", f2 -> f2
.handle(Sftp.outboundGateway(myFactory, Command.MPUT, "payload")
.autoCreateDirectory(false)
.remoteDirectoryExpression(myContext.getRemoteDir()))
2022-10-22 00:11:30.321 ERROR [] --- [pool-18-thread-1] .l.c.c.j.MyExportTask : MyExports failed! Exception: {}
org.springframework.expression.spel.SpelEvaluationException: EL1008E: Property or field 'data' cannot be found on object of type 'org.springframework.integration.support.MutableMessage' - maybe not public or not valid?
at org.springframework.expression.spel.ast.PropertyOrFieldReference.readProperty(PropertyOrFieldReference.java:217)
at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:104)
at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:91)
at org.springframework.expression.spel.ast.OpDivide.getValueInternal(OpDivide.java:49)
at org.springframework.expression.spel.ast.OpMinus.getValueInternal(OpMinus.java:98)
at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:117)
at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:375)
at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:171)
at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:129)
at org.springframework.integration.handler.ExpressionEvaluatingMessageProcessor.processMessage(ExpressionEvaluatingMessageProcessor.java:107)
at org.springframework.integration.file.remote.RemoteFileTemplate.doSend(RemoteFileTemplate.java:325)
at org.springframework.integration.file.remote.RemoteFileTemplate.lambda$send$0(RemoteFileTemplate.java:301)
at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:439)
at org.springframework.integration.file.remote.RemoteFileTemplate.send(RemoteFileTemplate.java:301)
at org.springframework.integration.file.remote.RemoteFileTemplate.send(RemoteFileTemplate.java:289)
at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.put(AbstractRemoteFileOutboundGateway.java:807)
at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.lambda$doPut$11(AbstractRemoteFileOutboundGateway.java:793)
at org.springframework.integration.file.remote.RemoteFileTemplate.invoke(RemoteFileTemplate.java:471)
at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.doPut(AbstractRemoteFileOutboundGateway.java:792)
at org.springframework.integration.file.remote.RemoteFileTemplate.send(RemoteFileTemplate.java:301)
at org.springframework.integration.file.remote.RemoteFileTemplate.send(RemoteFileTemplate.java:289)
at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.put(AbstractRemoteFileOutboundGateway.java:807)
at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.lambda$doPut$11(AbstractRemoteFileOutboundGateway.java:793)
at org.springframework.integration.file.remote.RemoteFileTemplate.invoke(RemoteFileTemplate.java:471)
at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.doMput(AbstractRemoteFileOutboundGateway.java:854)
at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.handleRequestMessage(AbstractRemoteFileOutboundGateway.java:594)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:62)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:213)
at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:195)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:62)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:233)
at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:46)
at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97)
at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:38)
at org.springframework.messaging.core.AbstractMessagingTemplate.convertSendAndReceive(AbstractMessagingTemplate.java:96)
at org.springframework.messaging.core.AbstractMessagingTemplate.convertSendAndReceive(AbstractMessagingTemplate.java:86)
at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:514)
at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceive(MessagingGatewaySupport.java:488)
at org.springframework.integration.gateway.GatewayProxyFactoryBean.sendOrSendAndReceive(GatewayProxyFactoryBean.java:648)
at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:573)
at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:540)
at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:540)
at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:529)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
at com.sun.proxy.$Proxy1232.mput(Unknown Source)
at com.lhsystems.cdb.cdbjob.job.MyExportTask.runAsTask(MyExportTask.java:73)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:840)
I have no clue what causes that exception. The changed SubflowMapping
even does not use a SPEL expression anymore. Any help is highly welcome!
So, you send a String localFilename
to that gateway.
The logic there fails to this then:
else if (payload instanceof String) {
file = new File((String) payload);
}
And according to your observation and execution result we end up here:
else if (!file.isDirectory()) {
return doPut(requestMessage);
}
Some way it does not recognize the file against your ~/sftp-outbound/Export
as a dir and just performs a plain PUT
with a single file.
Try to resolve your ~/sftp-outbound/Export
as a File
object and send already that one to this gateway.