Search code examples
javaspring-integrationspring-dsl

Spring Integration open a new ssh connection after each SftpException


Versions: spring-boot-starter-parent : 3.2.3 java: 17

maven artifacts : spring-boot-starter-integration spring-integration-sftp

I'm facing an issue with the number of ssh connections opened to my SFTP server.

All works fine when there is no exception, I keep the same session open to check and transfert files. But as soon as there is an Exception, the connections keep increasing on each encountered error until it overload the SFTP.

Here is the interresting part of my code :

    @Bean
    public CachingSessionFactory<SftpClient.DirEntry> sftpSessionFactory(
            final SftpPrivateKeyConfigurationProperties privateKeyConfigurationProperties,
            final SftpConfigurationProperties sftpConfigurationProperties) {
        final DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost(privateKeyConfigurationProperties.getHost());
        factory.setPort(privateKeyConfigurationProperties.getPort());
        factory.setUser(privateKeyConfigurationProperties.getUser());
        factory.setPrivateKey(new ByteArrayResource(privateKeyConfigurationProperties.getPrivateKey().getBytes()));
        factory.setAllowUnknownKeys(true);
        factory.setTimeout(Math.toIntExact(sftpConfigurationProperties.getTimeout().toMillis()));
        var cachedFactory = new CachingSessionFactory<>(factory, sftpConfigurationProperties.getPoolSize());
        cachedFactory.setSessionWaitTimeout(sftpConfigurationProperties.getSessionWaitTimeout().toMillis());
        return cachedFactory;
    }

    @Bean
    IntegrationFlow flowSubAccount(final SftpConfigurationProperties sftpConfigurationProperties,
            final CachingSessionFactory<SftpClient.DirEntry> sessionFactory,
            @Qualifier("sftpFilter") final ChainFileListFilter<SftpClient.DirEntry> sftpFilter,
            final ConfigurableApplicationContext ctx)
            throws IOException {

        return IntegrationFlow.from(
                Sftp.inboundAdapter(sessionFactory, Comparator.comparingLong(File::lastModified))
                        .preserveTimestamp(true)
                        .deleteRemoteFiles(sftpConfigurationProperties.getDeleteRemoteFiles())
                        .remoteDirectory(sftpConfigurationProperties.getRemoteDirectory())
                        .filter(sftpFilter)
                        .localDirectory(new File(sftpConfigurationProperties.getLocalDirectory()))
                        .localFilenameExpression("#root")
                        .autoCreateLocalDirectory(true),
                e -> e.autoStartup(true).poller(Pollers.fixedDelay(sftpConfigurationProperties.getPollDuration()).maxMessagesPerPoll(1).errorHandler((ex) -> {
                    log.error("An error has been encountered", ex);
                })))
                .route(Message.class, this::payloadTypeSubAccountRouter)
                .get();

Every time the poller retrieve a file, if an exception occurs (not enough permission on file, no such file) there is a new ssh connection created.

Here one of the stacktrace :

org.springframework.messaging.MessagingException: Problem occurred while synchronizing 'this/path/does/not/exists' to local directory
    at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:348) ~[spring-integration-file-6.2.2.jar:6.2.2]
    at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:267) ~[spring-integration-file-6.2.2.jar:6.2.2]
    at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:69) ~[spring-integration-file-6.2.2.jar:6.2.2]
    at org.springframework.integration.endpoint.AbstractFetchLimitingMessageSource.doReceive(AbstractFetchLimitingMessageSource.java:47) ~[spring-integration-core-6.2.2.jar:6.2.2]
    at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:142) ~[spring-integration-core-6.2.2.jar:6.2.2]
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:222) ~[spring-integration-core-6.2.2.jar:6.2.2]
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:450) ~[spring-integration-core-6.2.2.jar:6.2.2]
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:419) ~[spring-integration-core-6.2.2.jar:6.2.2]
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:355) ~[spring-integration-core-6.2.2.jar:6.2.2]
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57) ~[spring-integration-core-6.2.2.jar:6.2.2]
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) ~[spring-core-6.1.4.jar:6.1.4]
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55) ~[spring-integration-core-6.2.2.jar:6.2.2]
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:348) ~[spring-integration-core-6.2.2.jar:6.2.2]
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-6.1.4.jar:6.1.4]
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:96) ~[spring-context-6.1.4.jar:6.1.4]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: org.springframework.messaging.MessagingException: Failed to execute on session
    at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:461) ~[spring-integration-file-6.2.2.jar:6.2.2]
    at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:341) ~[spring-integration-file-6.2.2.jar:6.2.2]
    ... 20 common frames omitted
Caused by: org.apache.sshd.sftp.common.SftpException: No such file
    at org.apache.sshd.sftp.client.impl.AbstractSftpClient.throwStatusException(AbstractSftpClient.java:277) ~[sshd-sftp-2.11.0.jar:2.11.0]
    at org.apache.sshd.sftp.client.impl.AbstractSftpClient.checkAttributesResponse(AbstractSftpClient.java:333) ~[sshd-sftp-2.11.0.jar:2.11.0]
    at org.apache.sshd.sftp.client.impl.AbstractSftpClient.checkAttributes(AbstractSftpClient.java:325) ~[sshd-sftp-2.11.0.jar:2.11.0]
    at org.apache.sshd.sftp.client.impl.AbstractSftpClient.lstat(AbstractSftpClient.java:1010) ~[sshd-sftp-2.11.0.jar:2.11.0]
    at org.springframework.integration.sftp.session.SftpSession.doList(SftpSession.java:102) ~[spring-integration-sftp-6.2.2.jar:6.2.2]
    at org.springframework.integration.sftp.session.SftpSession.list(SftpSession.java:80) ~[spring-integration-sftp-6.2.2.jar:6.2.2]
    at org.springframework.integration.sftp.session.SftpSession.list(SftpSession.java:52) ~[spring-integration-sftp-6.2.2.jar:6.2.2]
    at org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession.list(CachingSessionFactory.java:246) ~[spring-integration-file-6.2.2.jar:6.2.2]
    at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.transferFilesFromRemoteToLocal(AbstractInboundFileSynchronizer.java:356) ~[spring-integration-file-6.2.2.jar:6.2.2]
    at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.lambda$synchronizeToLocalDirectory$0(AbstractInboundFileSynchronizer.java:342) ~[spring-integration-file-6.2.2.jar:6.2.2]
    at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:452) ~[spring-integration-file-6.2.2.jar:6.2.2]
    ... 21 common frames omitted

The sessions keep increasing until the shutdown of the application. As soon as the application is stopped, the connections are closed and the sftp server is back to normal. (Seen more thant 300 connections)

I tried to replace the CachingSessionFactory with a default session factory, but this is worse obviously. I tried to put logs in TRACE for Spring Integration to check the path of the sessions. It seems that Spring, when the exception occurs, ask for the close of the connection, flag it dirty and create a new one. But the "dirty connection" keeps opened.

Logs said that : session is flagged dirty Spring remove it from the pool of connection Spring create a new one The connection removed from pool remain active.

I tried to close completely the pool of connection to create a new one, but i was facing other issues.

An idea, but seems weird, should be to stop my flow a "restart" it when this kind of exception is caught.

Do you have any idea of to close completely these dirty sessions, still active?


Solution

  • The issue was not with the SFTPSession or the SFTPClient, but the ClientSession.

    In SpringIntegration DefaultSftpSessionFactory, int the getSession method we can find these lines :

    if (sftpClient == null || !sftpClient.isOpen()) {
        sftpClient = createSftpClient(initClientSession(), this.sftpVersionSelector, SftpErrorDataHandler.EMPTY);
        freshSftpClient = true;
    }
    

    The initClientSession() method create a new SshClient.

    So with any error occuring during the flow, leading to a dirty SftpSession, this SftpSession is closed properly, but the underlying ClientSession (SSH) stay alive and another one is created.

    To solve this behaviour, you can create your own version of the DefaultSftpSessionFactory and add a cache for your clientSession if you want to reuse it, or close it properly if it exists before creating a new one.