Search code examples
javaspring-bootspring-integrationmessagingspring-integration-dsl

Spring Integration error: Failure to write to 'foo/002.txt.writing' while uploading the file


I'm experimenting with the following sample setup from SI samples to understand its inner workings with a goal to then adapt it to similar set of tasks in upper environments.

DailyCashReportApplication.java

@Slf4j
@SpringBootApplication
@PropertySource(value = "classpath:feeds-config.yml", factory = com.pru.globalpayments.feeds.downstream.YamlPropertySourceFactory.class)
public class DailyCashReportApplication  implements CommandLineRunner  {
    
//  @Autowired
//  @Qualifier("syntheticRunner") //TODO: to be replaced by runners consuming from appropriate data sources
//  private AbstractRunner runner; 

    public static void main(String[] args) {
        log.info("DailyCashReportApplication running...");
    
        new SpringApplicationBuilder(DailyCashReportApplication.class).web(WebApplicationType.NONE).run(args);
        
    }

    @Override
    public void run(String... args) throws Exception {
        //runner.run(args);
    }

DcrConfig.java:

@Configuration
public class DcrConfig {

    private final static String EMAIL_SUCCESS_SUFFIX = "emailSuccessSuffix";
    
    // sftp

    @Value("${sftp.in.host}")
    @Getter
    private String host;

    @Value("${sftp.in.port}")
    private int port;

    @Value("${sftp.in.user}")
    private String user;

    @Value("${sftp.in.privateKey}")
    @Getter
    private String privateKeyLocation;

    @Value("${sftp.in.remoteDir}")
    @Getter
    private String remoteDir;



    @Value("${sftp.in.localDir}")
    @Getter
    private String localDir;

    @Value("${sftp.in.chmod}")
    private String chmod;

    @Value("${sftp.in.maxFetchSize}")
    @Getter
    private int maxFetchSize;

    @Value("${sftp.in.file.filter}")
    private String fileFilter;

    @Autowired
    private ResourceLoader resourceLoader;
    
    @Bean(name = "downloadSftpSessionFactory")
    public SessionFactory<LsEntry> sftpSessionFactory() throws IOException {

        Resource keyFileResource = resourceLoader.getResource(privateKeyLocation);

        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUser(user);
        factory.setPrivateKey(keyFileResource);
        factory.setAllowUnknownKeys(true);
        return new CachingSessionFactory<LsEntry>(factory);
    }

    
    
    @Bean
    public IntegrationFlow fromFile() {
        return IntegrationFlows.from(Files.inboundAdapter(new File(remoteDir))
                                        .preventDuplicates(false)
                                        .patternFilter("*.txt"), 
                e -> e.poller(Pollers.fixedDelay(5000))
                                        .id("fileInboundChannelAdapter"))
                .handle(Files.splitter(true, true))
                .<Object, Class<?>>route(Object::getClass, m->m
                        .channelMapping(FileSplitter.FileMarker.class, "markers.input")
                        .channelMapping(String.class, "lines.input"))
                .get();
    }
    
    @Bean
    public FileWritingMessageHandlerSpec fileOut() {
        return Files.outboundAdapter("'"+localDir+"'")
                .appendNewLine(true)
                .fileNameExpression("payload.substring(1,4) + '.txt'");
    }
    
    @Bean
    public IntegrationFlow lines(FileWritingMessageHandler fileOut) {
        return f -> f.handle(fileOut); 
    }
    

    
    @Bean
    public IntegrationFlow markers() throws IOException{
        return f -> f.<FileSplitter.FileMarker>filter(m -> m.getMark().equals(FileSplitter.FileMarker.Mark.END),
                        e -> e.id("markerFilter"))
                .publishSubscribeChannel(s -> s

                        // first trigger file flushes
                        .subscribe(sf -> sf.transform("'tmp/out/.*\\.txt'", e -> e.id("toTriggerPattern"))
                                .trigger("fileOut", e -> e.id("flusher")))

                        // send the first file
                        .subscribe(sf -> {
                            try {
                                sf.<FileSplitter.FileMarker, File>transform(p -> new File("tmp/out/002.txt"))
                                        .enrichHeaders(h -> h.header(FileHeaders.FILENAME, "002.txt", true))
                                        .handle(Sftp.outboundAdapter(ftp()).remoteDirectory("foo"), e -> e.id("ftp002"));
                            } catch (IOException e1) {
                                // TODO Auto-generated catch block
                                e1.printStackTrace();
                            }
                        })

                        // send the second file
                        .subscribe(sf -> {
                            try {
                                sf.<FileSplitter.FileMarker, File>transform(p -> new File("/tmp/out/006.txt"))
                                        .enrichHeaders(h -> h.header(FileHeaders.FILENAME, "006.txt", true))
                                        .handle(Sftp.outboundAdapter(ftp()).remoteDirectory("foo"), e -> e.id("ftp006"));
                            } catch (IOException e1) {
                                // TODO Auto-generated catch block
                                e1.printStackTrace();
                            }
                        })

                        // send the third file
                        .subscribe(sf -> {
                            try {
                                sf.<FileSplitter.FileMarker, File>transform(p -> new File("/tmp/out/009.txt"))
                                        .enrichHeaders(h -> h.header(FileHeaders.FILENAME, "009.txt", true))
                                        .handle(Sftp.outboundAdapter(ftp()).remoteDirectory("foo"), e -> e.id("ftp009"));
                            } catch (IOException e1) {
                                // TODO Auto-generated catch block
                                e1.printStackTrace();
                            }
                        })


                        );
    }
    

    
    

    

    
    @Bean
    public SessionFactory<LsEntry> ftp() throws IOException {

        Resource keyFileResource = resourceLoader.getResource(privateKeyLocation);


        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUser(user);
        factory.setPrivateKey(keyFileResource);
        factory.setAllowUnknownKeys(true);
        return new CachingSessionFactory<LsEntry>(factory);
    }
    
    @Bean
    public MethodInterceptor afterMailAdvice() {
        return invocation -> {
            Message<?> message = (Message<?>) invocation.getArguments()[0];
            MessageHeaders headers = message.getHeaders();
            File originalFile = headers.get(FileHeaders.ORIGINAL_FILE, File.class);
            try {
                invocation.proceed();
                originalFile.renameTo(new File(originalFile.getAbsolutePath() + headers.get(EMAIL_SUCCESS_SUFFIX)));
            }
            catch(Exception e) {
                originalFile.renameTo(new File(originalFile.getAbsolutePath() + headers.get(EMAIL_SUCCESS_SUFFIX)+"email.failed"));
            }
            return null;
        };
    }
    
}

application-dev.yml

spring:
  application:
    name: daily-cash-report-application

sftp:
  in:
    host: eafdev
    port: 22
    user: eafdev
    remoteDir: tmp/in 
    tmpDir: 
    localDir: tmp/out 
    maxFetchSize: 1    
    privateKey: file:///C:/Users/x12345/AppData/Roaming/SSH/UserKeys/distributessh
    chmod: 664 
    poller:
      fixedDelay: 10000
    file:
      filter: A.B.*    #TODO: calibrate once spec is known
  out:
    host: eafdev
    port: 22
    user: eafdev
    remoteDir: tmp/out  
    privateKey: file:///C:/Users/x12345/AppData/Roaming/SSH/UserKeys/distributessh
    chmod: 664 

file: 
  out:   
    targetDir: 
    tmpDir:

DailyCashReportApplicationTest.java:

@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@SpringIntegrationTest(noAutoStartup = "fileInboundChannelAdapter")
@SpringJUnitConfig(classes = { 
        DailyCashReportApplication.class,
        DcrConfig.class,
        })
@EnableConfigurationProperties
@PropertySource(value = "application-dev.yml", factory = YamlPropertySourceFactory.class)
@Slf4j
class DailyCashReportApplicationTest {

    @Test
    @SneakyThrows
    void test()  {

        File in = new File("tmp/in/", "foo");
        FileOutputStream fos = new FileOutputStream(in);
        fos.write("*002,foo,bar\n*006,baz,qux\n*009,fiz,buz\n".getBytes());
        fos.close();
        in.renameTo(new File("tmp/in/", "foo.txt"));

        File out = new File("tmp/out/002.txt");
        int n = 0;
        while (n++ < 100 && (!out.exists() || out.length() < 12)) {
            Thread.sleep(100);
        }
        assertThat(out.exists()).isTrue();

    }

}

Regardless if run via *ApplicationTest.java, or via executing the above *Application.java file in the IDE and dropping the same foo.txt content in the pre-created tmp/in folder, I'm seeing partially correct behavior in terms of tmp\in\foo.txt being picked up and 3 files being created in the tmp/out accompanied by the following error:

2023-09-20 11:40:21.384 [scheduling-1] ERROR o.s.i.handler.LoggingHandler - org.springframework.messaging.MessageDeliveryException: Error handling message for file [C:\code\STS\daily-cash-report\tmp\out\002.txt -> 002.txt]; nested exception is org.springframework.messaging.MessagingException: Failed to write to 'foo/002.txt.writing' while uploading the file; nested exception is java.io.IOException: failed to write file, failedMessage=GenericMessage [payload=tmp\out\002.txt, headers={file_lineCount=3, file_name=002.txt, file_originalFile=tmp\in\foo.txt, id=365e346c-78d4-47ab-d6c0-e4f0f5f72f43, file_marker=END, file_relativePath=foo.txt, timestamp=1695224420443}]
    at org.springframework.integration.file.remote.RemoteFileTemplate.doSend(RemoteFileTemplate.java:368)
    at org.springframework.integration.file.remote.RemoteFileTemplate.lambda$send$0(RemoteFileTemplate.java:314)
    at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:452)
    at org.springframework.integration.file.remote.RemoteFileTemplate.send(RemoteFileTemplate.java:314)
    at org.springframework.integration.file.remote.RemoteFileTemplate.send(RemoteFileTemplate.java:302)
    at org.springframework.integration.file.remote.RemoteFileTemplate.send(RemoteFileTemplate.java:294)
    at org.springframework.integration.file.remote.handler.FileTransferringMessageHandler.handleMessageInternal(FileTransferringMessageHandler.java:207)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:55)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:456)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:324)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:267)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:231)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:142)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:55)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:456)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:324)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:267)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:231)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:142)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:55)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:456)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:324)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:267)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:231)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:142)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:55)
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:222)
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:178)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:456)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:324)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:267)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:231)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:142)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:55)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:213)
    at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:195)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:55)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:456)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:324)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:267)
    at org.springframework.integration.splitter.AbstractMessageSplitter.produceOutput(AbstractMessageSplitter.java:317)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:231)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:142)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:55)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:196)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:474)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:460)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:412)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:347)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:340)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:95)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.springframework.messaging.MessagingException: Failed to write to 'foo/002.txt.writing' while uploading the file; nested exception is java.io.IOException: failed to write file
    at org.springframework.integration.file.remote.RemoteFileTemplate.sendFileToRemoteDirectory(RemoteFileTemplate.java:573)
    at org.springframework.integration.file.remote.RemoteFileTemplate.doSend(RemoteFileTemplate.java:353)
    ... 127 more
Caused by: java.io.IOException: failed to write file
    at org.springframework.integration.sftp.session.SftpSession.write(SftpSession.java:175)
    at org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession.write(CachingSessionFactory.java:237)
    at org.springframework.integration.file.remote.RemoteFileTemplate.doSend(RemoteFileTemplate.java:582)
    at org.springframework.integration.file.remote.RemoteFileTemplate.sendFileToRemoteDirectory(RemoteFileTemplate.java:570)
    ... 128 more
Caused by: 2: No such file
    at com.jcraft.jsch.ChannelSftp.throwStatusError(ChannelSftp.java:2873)
    at com.jcraft.jsch.ChannelSftp._put(ChannelSftp.java:594)
    at com.jcraft.jsch.ChannelSftp.put(ChannelSftp.java:540)
    at com.jcraft.jsch.ChannelSftp.put(ChannelSftp.java:492)
    at org.springframework.integration.sftp.session.SftpSession.write(SftpSession.java:172)
    ... 131 more

The gist seems to be: Failed to write to 'foo/002.txt.writing' while uploading the file; nested exception is java.io.IOException: failed to write file, failedMessage=GenericMessage [payload=tmp\out\002.txt, headers={file_lineCount=3, file_name=002.txt, file_originalFile=tmp\in\foo.txt, id=365e346c-78d4-47ab-d6c0-e4f0f5f72f43, file_marker=END, file_relativePath=foo.txt, timestamp=1695224420443}]

It is thrown on a brand-new file being dropped in the tmp\in folder and then repeatedly on every poller execution while the split into the 3 resulting files: 002.txt, 006.txt and 009.txt happens correctly, as mentioned above.

I'm aware of the .writing extension (as per docs), however this doesn't seem like an expected behavior. While playing with that SI sample, I'd like to get to the bottom of why this error is happening so I can eliminate it migrating the app from this experimental environment into the real ones.


Solution

  • It fails because there is just no that foo remote directory to upload file into.

    See autoCreateDirectory(true) option:

    /**
     * A {@code boolean} flag to indicate automatically create the directory or not.
     * @param autoCreateDirectory true to automatically create the directory.
     * @return the current Spec
     */
    public S autoCreateDirectory(boolean autoCreateDirectory) {
    

    It is false by default on the RemoteFileTemplate.

    Therefore your config must be like this:

    Sftp.outboundAdapter(ftp()).remoteDirectory("foo").autoCreateDirectory(true)