Search code examples
spring-integrationspring-integration-dslspring-integration-sftp

How to put files with SftpOutboundGateway MPUT command properly?


I want to upload all files from the local folder ~/sftp-outbound/Export to a SFTP server. The folder contains two files:

  • foo1.TEST.txt
  • foo2.TEST.txt

I am doing it with the MPUT command of a SftpOutboundGateway within a Flow/Subflow DSL style (actually there are more gateway methods which I have removed for better focus and readability).

My configuration is this:

@Bean
public TransferChannel myChannel() {
    LOG.debug("myChannel");
    TransferChannel channel = new TransferChannel();
    channel.setHost(myEnv.getSftpHost());
    channel.setPort(myEnv.getSftpPort());
    channel.setUser(myEnv.getSftpUser());
    channel.setPrivateKey(myEnv.getSftpPrivateKey());
    channel.setPassword(myEnv.getSftpPassword());
    return channel;
}

@Bean
public TransferContext myContext(TransferChannel myChannel) {
    LOG.debug("myContext");
    TransferContext context = new TransferContext();
    context.setEnabled(env.isEnabled());
    context.setChannel(myChannel);
    context.setPreserveTimestamp(true);
    context.setLocalDir(env.getLocalDir());
    context.setLocalFilename(env.getLocalFilename());
    context.setRemoteDir(env.getRemoteDir());
    return context;
}

@Bean
public SessionFactory<LsEntry> myFactory(TransferChannel myChannel) {
    LOG.debug("myFactory");
    DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
    sf.setHost(myChannel.getHost());
    sf.setPort(myChannel.getPort());
    sf.setUser(myChannel.getUser());
    if (myChannel.getPrivateKey() != null) {
        sf.setPrivateKey(myChannel.getPrivateKey());
    } else {
        sf.setPassword(myChannel.getPassword());
    }
    sf.setAllowUnknownKeys(true);
    return new CachingSessionFactory<LsEntry>(sf);
}

@Bean
public IntegrationFlow myFlow(SessionFactory<LsEntry> myFactory, TransferContext myContext) {
    LOG.debug("myFlow");
    return IntegrationFlows.from(myGateway.class, g -> g    
                .header("method", args -> args.getMethod().getName()))
        .log()
        .route(Message.class, m -> m.getHeaders().get("method", String.class),
                r -> r
                        .subFlowMapping("mput", f2 -> f2
                                .handle(Sftp.outboundGateway(
                                        remoteFileTemplate(myFactory,
                                                new SpelExpressionParser().parseExpression(
                                                        "headers['" + FileHeaders.REMOTE_DIRECTORY + "']")),
                                        Command.MPUT, "payload"))
                        ))
        .get();
}

@Bean
public RemoteFileTemplate<LsEntry> remoteFileTemplate(SessionFactory<LsEntry> sessionFactory,
        Expression directory) {
    RemoteFileTemplate<LsEntry> template = new SftpRemoteFileTemplate(sessionFactory);
    template.setRemoteDirectoryExpression(directory);
    template.setAutoCreateDirectory(false);
    template.afterPropertiesSet();
    return template;
}

public interface MyGateway {
    List<String> mput(String localDir,
            @Header(FileHeaders.REMOTE_DIRECTORY) String remoteDirectory);
}

The call of the MyGateway method is:

@Autowired
private MyGateway gate;

...
String localFilename = "~/sftp-outbound/Export";
LOG.debug("runAsTask mput={}", localFilename);
jobLOG.info("put files to SFTP: {}", localFilename);
List<String> result = gate.mput(localFilename, env.getRemoteDir());
LOG.debug("runAsTask, files transferred, result={}", result);

It comes with the following LOG output:

2022-10-21 00:02:00.000  INFO [] --- [pool-125-thread-1]  job-ExportData                           : put files to SFTP: ~/sftp-outbound/Export
2022-10-21 00:02:00.001  INFO [] --- [pool-125-thread-1]  o.s.integration.handler.LoggingHandler   : GenericMessage [payload=~/cdb/sftp-outbound/Export, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ce6c62d4, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ce6c62d4, id=02f7972e-ef06-2297-b1dd-fed5cd75a2d2, method=mput, file_remoteDirectory=data/export, timestamp=1666303320001}]
2022-10-21 00:02:00.008 DEBUG [] --- [pool-125-thread-1]  .l.c.c.j.ExportDataTask                  : runAsTask, files transferred, result=[data/export/02f7972e-ef06-2297-b1dd-fed5cd75a2d2.msg]

One sees two things:

  1. there is only one file transferred instead of two
  2. the remote filename is 02f7972e-ef06-2297-b1dd-fed5cd75a2d2.msg instead of foo1.TEST.txt or foo2.TEST.txt

What am I doing wrong?

EDIT 1

Trying to find the problem I slightly changed the Integration Flow configuration, setting the remote directory inline to make the RemoteFileTemplate bean obsolete:

.subFlowMapping("mput", f2 -> f2
        .handle(Sftp.outboundGateway(myFactory, Command.MPUT, "payload")
                .autoCreateDirectory(false)
                    .remoteDirectoryExpression(myContext.getRemoteDir()))

Following your answer, Artem, I changed the SftpOutboundGateway method's parameter to type java.io.File as follows:

public interface MyGateway {
    List<String> mput(File localDir);
}

The mput gateway method call is now:

List<String> result = gate.mput(new File(localFilename));

But this did not resolve my problem. Now I get the following error:

.subFlowMapping("mput", f2 -> f2
    .handle(Sftp.outboundGateway(myFactory, Command.MPUT, "payload")
            .autoCreateDirectory(false)
            .remoteDirectoryExpression(myContext.getRemoteDir()))


2022-10-22 00:11:30.321 ERROR [] --- [pool-18-thread-1]  .l.c.c.j.MyExportTask : MyExports failed! Exception: {}

org.springframework.expression.spel.SpelEvaluationException: EL1008E: Property or field 'data' cannot be found on object of type 'org.springframework.integration.support.MutableMessage' - maybe not public or not valid?
        at org.springframework.expression.spel.ast.PropertyOrFieldReference.readProperty(PropertyOrFieldReference.java:217)
        at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:104)
        at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:91)
        at org.springframework.expression.spel.ast.OpDivide.getValueInternal(OpDivide.java:49)
        at org.springframework.expression.spel.ast.OpMinus.getValueInternal(OpMinus.java:98)
        at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:117)
        at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:375)
        at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:171)
        at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:129)
        at org.springframework.integration.handler.ExpressionEvaluatingMessageProcessor.processMessage(ExpressionEvaluatingMessageProcessor.java:107)
        at org.springframework.integration.file.remote.RemoteFileTemplate.doSend(RemoteFileTemplate.java:325)
        at org.springframework.integration.file.remote.RemoteFileTemplate.lambda$send$0(RemoteFileTemplate.java:301)
        at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:439)
        at org.springframework.integration.file.remote.RemoteFileTemplate.send(RemoteFileTemplate.java:301)
        at org.springframework.integration.file.remote.RemoteFileTemplate.send(RemoteFileTemplate.java:289)
        at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.put(AbstractRemoteFileOutboundGateway.java:807)
        at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.lambda$doPut$11(AbstractRemoteFileOutboundGateway.java:793)
        at org.springframework.integration.file.remote.RemoteFileTemplate.invoke(RemoteFileTemplate.java:471)
        at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.doPut(AbstractRemoteFileOutboundGateway.java:792)
        at org.springframework.integration.file.remote.RemoteFileTemplate.send(RemoteFileTemplate.java:301)
        at org.springframework.integration.file.remote.RemoteFileTemplate.send(RemoteFileTemplate.java:289)
        at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.put(AbstractRemoteFileOutboundGateway.java:807)
        at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.lambda$doPut$11(AbstractRemoteFileOutboundGateway.java:793)
        at org.springframework.integration.file.remote.RemoteFileTemplate.invoke(RemoteFileTemplate.java:471)
        at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.doMput(AbstractRemoteFileOutboundGateway.java:854)
        at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.handleRequestMessage(AbstractRemoteFileOutboundGateway.java:594)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:62)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
        at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:213)
        at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:195)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:62)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:233)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:47)
        at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:46)
        at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97)
        at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:38)
        at org.springframework.messaging.core.AbstractMessagingTemplate.convertSendAndReceive(AbstractMessagingTemplate.java:96)
        at org.springframework.messaging.core.AbstractMessagingTemplate.convertSendAndReceive(AbstractMessagingTemplate.java:86)
        at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:514)
        at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceive(MessagingGatewaySupport.java:488)
        at org.springframework.integration.gateway.GatewayProxyFactoryBean.sendOrSendAndReceive(GatewayProxyFactoryBean.java:648)
        at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:573)
        at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:540)
        at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:540)
        at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:529)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
        at com.sun.proxy.$Proxy1232.mput(Unknown Source)
        at com.lhsystems.cdb.cdbjob.job.MyExportTask.runAsTask(MyExportTask.java:73)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
        at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
        at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:840)

I have no clue what causes that exception. The changed SubflowMapping even does not use a SPEL expression anymore. Any help is highly welcome!


Solution

  • So, you send a String localFilename to that gateway.

    The logic there fails to this then:

    else if (payload instanceof String) {
        file = new File((String) payload);
    }
    

    And according to your observation and execution result we end up here:

        else if (!file.isDirectory()) {
            return doPut(requestMessage);
        }
    

    Some way it does not recognize the file against your ~/sftp-outbound/Export as a dir and just performs a plain PUT with a single file.

    Try to resolve your ~/sftp-outbound/Export as a File object and send already that one to this gateway.