Search code examples
javaspringspring-integrationspring-integration-sftp

Why does the SFTP Outbound Gateway not start working as soon as I start its Integration Flow?


My scenario is that I have a Spring Integration Flow with a SftpInboundAdapter which shall get a file from a SftpServer "myHost". The file contains JSON which is to be converted into MyEvent entities to be forwarded to further processing. The process is implemented in a task scheduled by Spring Scheduler. Therefore the Integration Flow is not to start automatically with the Application having that autoStartup(false).

The Spring Integration Flow is:

  • testSftpSessionFactory: to provide the session to the SFTP server
  • testSftpInboundAdapter: to get the SFTP remote file
  • sftpInputChannel: a Publish-Subscribe channel to have multiple message consumers
  • sftpInputChannel-MessageHandler: to get the JSON content of the file transformed
  • deleteLocalFileService-MessageHandler: to delete the remote file after successful processing
  • controlChannel: to send Integration Flow control commands
  • controlChannel-ExpressionControlBusFactoryBean: to start the testSftpInboundAdapter

The types TransferContext, TransferChannel and MyService are Java classes of mine with some fields fed from YAML properties which provide values for the sftpSessionFactory as host, port, user, password and the sftpInboundAdapter as remoteDirectory, remoteFilename, preserveTimestamp, localDirectory, etc. The Service is to process the MyEvent entities.

These are my SI beans:

@Configuration
public class MySftpConfiguration {
    private static final Logger LOG = LoggerFactory.getLogger(MySftpConfiguration.class);

    @Bean
    public SessionFactory<LsEntry> testSftpSessionFactory(TransferChannel transferChannel) {
        LOG.debug("testSftpSessionFactory");
        DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
        sf.setHost(transferChannel.getHost());
        sf.setPort(transferChannel.getPort());
        sf.setUser(transferChannel.getUser());
        sf.setPassword(transferChannel.getPassword());
        sf.setAllowUnknownKeys(true);
        return new CachingSessionFactory<LsEntry>(sf);
    }

    @Bean
    public IntegrationFlow testSftpInboundFlow(TransferContext context) {
        LOG.debug("testSftpInboundFlow");
        return IntegrationFlows
            .from(Sftp.inboundAdapter(testSftpSessionFactory(context.getChannel()))
                    .preserveTimestamp(context.isPreserveTimestamp())
                    .remoteDirectory(context.getRemoteDir())
                    .regexFilter(context.getRemoteFilename())
                    .deleteRemoteFiles(context.isRemoteRemove())
                    .autoCreateLocalDirectory(context.isAutoCreateLocalDir())
                    .localFilenameExpression(context.getLocalFilename())
                    .localDirectory(new File(context.getLocalDir())),
                e -> e.id("sftpInboundAdapter")
                    .autoStartup(false)
                    .poller(Pollers.fixedDelay(5000))
                )
            .transform(Transformers.fromJson(Event[].class))
            .channel("sftpInputChannel")
            .get();
    }

    @Bean
    public PublishSubscribeChannel sftpInputChannel() {
        LOG.debug("sftpInputChannel");
        return new PublishSubscribeChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "sftpInputChannel")
    public MessageHandler handler() {
        LOG.debug("MessageHandler-handler");
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                LOG.debug("handleMessage message={}", message);
                myService().myServiceMethod((myEvent[]) message.getPayload());
            }
        };
    }

    @Bean
    @ServiceActivator(inputChannel = "sftpInputChannel")
    public MessageHandler deleteLocalFileService() {
        /* ... */
    }

    @Bean
    public MessageChannel controlChannel() {
        LOG.debug("controlChannel");
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "controlChannel")
    public ExpressionControlBusFactoryBean controlBus() {
        LOG.debug("controlBus");
        ExpressionControlBusFactoryBean expressionControlBusFactoryBean = new ExpressionControlBusFactoryBean();
        return expressionControlBusFactoryBean;
    }

The following code snippet is the task which does all the things, at first starting the SI Integration Flow to get a file from SFTP server and processing its content:

    @Scheduled(cron = "${harry.potter.tasks.my-task.frequency}")
    public void runAsTask() {

        if (env.isEnabled() && myTaskEnv.isEnabled()) {
            LOG.info("runAsTask");
            jobLOG.info("---------------------------------------------------------------");
            jobLOG.info("Job started: Storing Event Data");
        } else {
            LOG.debug("runAsTask env={}, myTask={}", env.isEnabled(), myTaskEnv.isEnabled());
            LOG.debug("runAsTask {} not enabled: aborted.", jobId);
            return;
        }

        try {

            LOG.debug("runAsTask globally enabled={}", env.isEnabled());
            /* ... */
    
            List<MyEvent> myEvents = null;
            try {           
                myEvents = getFile();
            }
            catch (Exception e) {
                LOG.error("{} failed! Exception {}!!!", jobId, e.getClass().getName());
                LOG.debug("... but continuing ...");
            }

            /* ... createEvents ... */

            /* ... cleanData ... */

            /* ... storeEvents ... */

            /* ... cleanHistoricData ... */

            jobLOG.info("Okay, send okay mail");
            sendOkayMail();

        }
        catch (Exception e) {
            LOG.error("{} failed! Exception: {}", jobId, e);
            jobLOG.info("Job has errors, send error mail");
            sendErrorMail();
        }
        finally {
            LOG.info("{} finished", jobId);
            jobLOG.info("Job finished");
        }
    }

The most important section to get the file from SFTP is the following getFile() method. It sends the start command to the sftpInboundAdapter, then initiates a CountDownLatch with count 1 to wait (a maximum of 20 seconds) for the remote file to be fetched. If the file was fetched within this timeout period, its entities are returned, otherwise ther is nothing to return (null):

EDIT 1:

Add the ChannelInterceptorbefore starting the SftpInboundAdapter:

@Autowired
AbstractMessageChannel sftpInputChannel;

@Autowired
MessageChannel controlChannel;

public List<MyEvent> getFile() {
        LOG.debug("getFile");

        final List<MyEvent> events = new ArrayList<>();

        CountDownLatch latch = new CountDownLatch(1);

        sftpInputChannel.addInterceptor(new ChannelInterceptor() {
            // capture the message and count down the latch
            @Override
            public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
                LOG.debug("postSend channel={}, sent={}, message={}",
                        channel.toString(), sent, message.getPayload());

                // read and transform the content
                Arrays.stream((MyEvent[]) message.getPayload()).forEach(e -> events.add(e));

                // signal: work has done
                latch.countDown();
                ChannelInterceptor.super.postSend(message, channel, sent);
            }
            
        });

        boolean sent = controlChannel.send(new GenericMessage<>("@sftpInboundAdapter.start()"));
        LOG.debug("getFile 'Start' control message sent successfully={}", sent);

        try {
            LOG.debug("getFile, waiting for the file to be received, timeout=30s");
            if (latch.await(20, TimeUnit.SECONDS)) {
                return events;
            }
        }
        catch (InterruptedException e) {
            LOG.warn("getFile, job was interrupted");
            throw new IllegalStateException("expected file not available");
        }

        return null;
    }

When it runs my expectation was that the SI beans get established in the main thread at first. Then another thread is started by the Spring Scheduler to execute the task. When sending the start message over the control channel, another thread would start to begin negotiating with the SFTP server and transferring the file while the task-thread enters the wait for the countdown latch to reach zero.

The logging shows another scenario. Sending the start command over the control channel has no effect on the number of threads which doing something. So there is no activity while the task thread enters the wait for the latch, but there is no-one to countdown. Consequently the full 20 seconds pass and the getFile() method returns without data.

As soon as the task-thread has finished, the JSCH module starts negotiating with the SFTP server. The transfer of the file does not work yet but this should be of no interest at the moment.

Here is the logging:

2021-09-16 13:54:34.877  INFO [] --- [http-nio-8080-exec-55] com.harry.potter.Application         : The following profiles are active: test
2021-09-16 13:54:37.129  INFO [] --- [http-nio-8080-exec-55]  o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2021-09-16 13:54:37.418  INFO [] --- [http-nio-8080-exec-55]  o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService 'taskScheduler'
2021-09-16 13:54:37.606  INFO [] --- [http-nio-8080-exec-55]  o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-09-16 13:54:37.606  INFO [] --- [http-nio-8080-exec-55]  o.s.i.channel.PublishSubscribeChannel    : Channel 'application-1.errorChannel' has 1 subscriber(s).
2021-09-16 13:54:37.606  INFO [] --- [http-nio-8080-exec-55]  o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2021-09-16 13:54:37.606  INFO [] --- [http-nio-8080-exec-55]  o.s.i.endpoint.EventDrivenConsumer       : Adding {json-to-object-transformer} as a subscriber to the 'testSftpInboundFlow.channel#0' channel
2021-09-16 13:54:37.606  INFO [] --- [http-nio-8080-exec-55]  o.s.integration.channel.DirectChannel    : Channel 'application-1.testSftpInboundFlow.channel#0' has 1 subscriber(s).
2021-09-16 13:54:37.606  INFO [] --- [http-nio-8080-exec-55]  o.s.i.endpoint.EventDrivenConsumer       : started bean 'testSftpInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/harry/potter/MySftpConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-09-16 13:54:37.606  INFO [] --- [http-nio-8080-exec-55]  o.s.i.endpoint.EventDrivenConsumer       : Adding {message-handler:mySftpConfiguration.handler.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-09-16 13:54:37.606  INFO [] --- [http-nio-8080-exec-55]  o.s.i.channel.PublishSubscribeChannel    : Channel 'application-1.sftpInputChannel' has 1 subscriber(s).
2021-09-16 13:54:37.606  INFO [] --- [http-nio-8080-exec-55]  o.s.i.endpoint.EventDrivenConsumer       : started bean 'mySftpConfiguration.handler.serviceActivator'
2021-09-16 13:54:37.606  INFO [] --- [http-nio-8080-exec-55]  o.s.i.endpoint.EventDrivenConsumer       : Adding {message-handler:mySftpConfiguration.deleteLocalFileService.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-09-16 13:54:37.606  INFO [] --- [http-nio-8080-exec-55]  o.s.i.channel.PublishSubscribeChannel    : Channel 'application-1.sftpInputChannel' has 2 subscriber(s).
2021-09-16 13:54:37.606  INFO [] --- [http-nio-8080-exec-55]  o.s.i.endpoint.EventDrivenConsumer       : started bean 'mySftpConfiguration.deleteLocalFileService.serviceActivator'
2021-09-16 13:54:37.606  INFO [] --- [http-nio-8080-exec-55]  o.s.i.endpoint.EventDrivenConsumer       : Adding {service-activator:mySftpConfiguration.controlBus.serviceActivator} as a subscriber to the 'controlChannel' channel
2021-09-16 13:54:37.606  INFO [] --- [http-nio-8080-exec-55]  o.s.integration.channel.DirectChannel    : Channel 'application-1.controlChannel' has 1 subscriber(s).
2021-09-16 13:54:37.606  INFO [] --- [http-nio-8080-exec-55]  o.s.i.endpoint.EventDrivenConsumer       : started bean 'mySftpConfiguration.controlBus.serviceActivator'
2021-09-16 13:54:37.614  INFO [] --- [http-nio-8080-exec-55]  o.s.c.n.eureka.InstanceInfoFactory       : Setting initial instance status as: STARTING
2021-09-16 13:54:38.458  INFO [] --- [http-nio-8080-exec-55]  c.l.job.MyApplication         : Started MyApplication in 4.539 seconds (JVM running for 88524.557)
2021-09-16 13:55:00.000  INFO [] --- [scheduling-1]  c.l.c.job.MyEvents                Task   : runAsTask: 
2021-09-16 13:55:00.000  INFO [] --- [scheduling-1]  job-MyEvents                             : ---------------------------------------------------------------
2021-09-16 13:55:00.000  INFO [] --- [scheduling-1]  job-MyEvents                             : Job started: 
2021-09-16 13:55:00.000 DEBUG [] --- [scheduling-1]  c.l.c.job.MyEvents                Task   : runAsTask globally enabled=true
2021-09-16 13:55:00.000 DEBUG [] --- [scheduling-1]  c.l.c.job.MyEvents                Task   : runAsTask enabled=true
2021-09-16 13:55:00.000 DEBUG [] --- [scheduling-1]  c.l.c.job.MyEvents                Task   : runAsTask monthsKeepBackwards=12
2021-09-16 13:55:00.000 DEBUG [] --- [scheduling-1]  c.l.c.job.MyEvents                Task   : runAsTask monthsUpdateFrameRelStart=2
2021-09-16 13:55:00.000 DEBUG [] --- [scheduling-1]  c.l.c.job.MyEvents                Task   : runAsTask monthsUpdateFrameRelEnd=null
2021-09-16 13:55:00.000 DEBUG [] --- [scheduling-1]  c.l.c.job.MyEvents                Task   : getFile
2021-09-16 13:55:00.023  INFO [] --- [scheduling-1]  o.s.i.e.SourcePollingChannelAdapter      : started bean 'sftpInboundAdapter'; defined in: 'class path resource [job/config/MySftpConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-09-16 13:55:00.024 DEBUG [] --- [scheduling-1]  c.l.c.job.MyEvents                Task   : getFile 'Start' control message sent successfully=true
2021-09-16 13:55:00.025 DEBUG [] --- [scheduling-1]  c.l.c.job.MyEvents                Task   : getFile, waiting for the file to be received, timeout=20s
2021-09-16 13:55:20.026 ERROR [] --- [scheduling-1]  c.l.c.job.MyEvents                Task   : MyEvents          failed! Exception java.lang.IllegalStateException!!!
2021-09-16 13:55:20.026 DEBUG [] --- [scheduling-1]  c.l.c.job.MyEvents                Task   : ... but continuing ...
2021-09-16 13:55:20.026 DEBUG [] --- [scheduling-1]  c.l.c.job.MyEvents                Task   : cleanData
2021-09-16 13:55:20.040 DEBUG [] --- [scheduling-1]  c.l.c.job.MyEvents                Task   : cleanData refDateFrom=2021-11-01, refDateTo=null
2021-09-16 13:55:20.042 ERROR [] --- [scheduling-1]  c.l.c.job.MyEvents                Task   : MyEvents          failed! Exception: {}

java.lang.NullPointerException: null
    at com.harry.potter.job.MyEvents                Task.cleanData(MyEvents         Task.java:187)
    at com.harry.potter.job.MyEvents                Task.runAsTask(MyEvents         Task.java:99)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:836)

2021-09-16 13:55:20.042  INFO [] --- [scheduling-1]  job-MyEvents                             : Job has errors, send error mail
2021-09-16 13:55:20.042 DEBUG [] --- [scheduling-1]  c.l.c.job.MyEvents                Task   : sendErrorMail
2021-09-16 13:55:20.192  INFO [] --- [scheduling-1]  c.l.c.job.MyEvents                Task   : MyEvents          finished
2021-09-16 13:55:20.192  INFO [] --- [scheduling-1]  job-MyEvents                             : Job finished
2021-09-16 13:55:20.205  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : Connecting to myHost   port 22
2021-09-16 13:55:20.207  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : Connection established
2021-09-16 13:55:20.217  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : Remote version string: SSH-2.0-OpenSSH_6.6.1p1 Debian-4~bpo70+1
2021-09-16 13:55:20.217  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : Local version string: SSH-2.0-JSCH-0.1.54
2021-09-16 13:55:20.217  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : CheckCiphers: aes256-ctr,aes192-ctr,aes128-ctr,aes256-cbc,aes192-cbc,aes128-cbc,3des-ctr,arcfour,arcfour128,arcfour256
2021-09-16 13:55:20.220  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : CheckKexes: diffie-hellman-group14-sha1,ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521
2021-09-16 13:55:20.257  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : CheckSignatures: ecdsa-sha2-nistp256,ecdsa-sha2-nistp384,ecdsa-sha2-nistp521
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : SSH_MSG_KEXINIT sent
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : SSH_MSG_KEXINIT received
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: server: curve25519-sha256@libssh.org,diffie-hellman-group-exchange-sha256,diffie-hellman-group-exchange-sha1,diffie-hellman-group14-sha1,diffie-hellman-group-exchange-sha1,diffie-hellman-group1-sha1
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: server: ssh-rsa,ssh-ed25519
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: server: chacha20-poly1305@openssh.com,aes256-gcm@openssh.com,aes128-gcm@openssh.com,aes256-ctr,aes192-ctr,aes128-ctr,aes128-cbc
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: server: chacha20-poly1305@openssh.com,aes256-gcm@openssh.com,aes128-gcm@openssh.com,aes256-ctr,aes192-ctr,aes128-ctr,aes128-cbc
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: server: hmac-sha2-512-etm@openssh.com,hmac-sha2-256-etm@openssh.com,hmac-ripemd160-etm@openssh.com,umac-128-etm@openssh.com,hmac-sha2-512,hmac-sha2-256,hmac-ripemd160,umac-128@openssh.com,hmac-sha1-96
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: server: hmac-sha2-512-etm@openssh.com,hmac-sha2-256-etm@openssh.com,hmac-ripemd160-etm@openssh.com,umac-128-etm@openssh.com,hmac-sha2-512,hmac-sha2-256,hmac-ripemd160,umac-128@openssh.com,hmac-sha1-96
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: server: none,zlib@openssh.com
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: server: none,zlib@openssh.com
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: server: 
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: server: 
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: client: ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521,diffie-hellman-group14-sha1,diffie-hellman-group-exchange-sha256,diffie-hellman-group-exchange-sha1,diffie-hellman-group1-sha1
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: client: ssh-rsa,ssh-dss,ecdsa-sha2-nistp256,ecdsa-sha2-nistp384,ecdsa-sha2-nistp521
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: client: aes128-ctr,aes128-cbc,3des-ctr,3des-cbc,blowfish-cbc,aes192-ctr,aes192-cbc,aes256-ctr,aes256-cbc
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: client: aes128-ctr,aes128-cbc,3des-ctr,3des-cbc,blowfish-cbc,aes192-ctr,aes192-cbc,aes256-ctr,aes256-cbc
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: client: hmac-md5,hmac-sha1,hmac-sha2-256,hmac-sha1-96,hmac-md5-96
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: client: hmac-md5,hmac-sha1,hmac-sha2-256,hmac-sha1-96,hmac-md5-96
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: client: none
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: client: none
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: client: 
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: client: 
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: server->client aes128-ctr hmac-sha2-256 none
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: client->server aes128-ctr hmac-sha2-256 none
2021-09-16 13:55:20.275  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : SSH_MSG_KEXDH_INIT sent
2021-09-16 13:55:20.275  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : expecting SSH_MSG_KEXDH_REPLY
2021-09-16 13:55:20.296  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : ssh_rsa_verify: signature true
2021-09-16 13:55:20.298  INFO [] --- [scheduling-1]  o.s.i.s.s.DefaultSftpSessionFactory      : The authenticity of host 'myHost  ' can't be established.
RSA key fingerprint is 4d:fe:f9:35:08:20:2e:76:76:55:7a:1d:5d:5d:1c:90.
Are you sure you want to continue connecting?
2021-09-16 13:55:20.298  WARN [] --- [scheduling-1]  com.jcraft.jsch                          : Permanently added 'myHost  ' (RSA) to the list of known hosts.
2021-09-16 13:55:20.299  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : SSH_MSG_NEWKEYS sent
2021-09-16 13:55:20.299  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : SSH_MSG_NEWKEYS received
2021-09-16 13:55:20.300  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : SSH_MSG_SERVICE_REQUEST sent
2021-09-16 13:55:20.300  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : SSH_MSG_SERVICE_ACCEPT received
2021-09-16 13:55:20.307  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : Authentications that can continue: publickey,keyboard-interactive,password
2021-09-16 13:55:20.307  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : Next authentication method: publickey
2021-09-16 13:55:20.307  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : Authentications that can continue: keyboard-interactive,password
2021-09-16 13:55:20.307  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : Next authentication method: keyboard-interactive
2021-09-16 13:55:20.409  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : Disconnecting from myHost   port 22
2021-09-16 13:55:20.419 ERROR [] --- [scheduling-1]  o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Problem occurred while synchronizing 'data' to local directory; nested exception is org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is org.springframework.integration.util.PoolItemNotAvailableException: Failed to obtain pooled item
    ...
Caused by: org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is org.springframework.integration.util.PoolItemNotAvailableException: Failed to obtain pooled item
    ...
Caused by: org.springframework.integration.util.PoolItemNotAvailableException: Failed to obtain pooled item
    ...
Caused by: java.lang.IllegalStateException: failed to create SFTP Session
    ...
   Caused by: java.lang.IllegalStateException: failed to connect
    ...
   Caused by: com.jcraft.jsch.JSchException: Auth cancel
    ...
2021-09-16 13:55:28.855  INFO [] --- [Catalina-utility-2]  o.s.i.e.SourcePollingChannelAdapter      : stopped bean 'sftpInboundAdapter'; defined in: 'class path resource [job/config/MySftpConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-09-16 13:55:28.855  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : Removing {json-to-object-transformer} as a subscriber to the 'testSftpInboundFlow.channel#0' channel
2021-09-16 13:55:28.855  INFO [] --- [Catalina-utility-2]  o.s.integration.channel.DirectChannel    : Channel 'application-1.testSftpInboundFlow.channel#0' has 0 subscriber(s).
2021-09-16 13:55:28.855  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : stopped bean 'testSftpInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [job/config/MySftpConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-09-16 13:55:28.855  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-09-16 13:55:28.855  INFO [] --- [Catalina-utility-2]  o.s.i.channel.PublishSubscribeChannel    : Channel 'application-1.errorChannel' has 0 subscriber(s).
2021-09-16 13:55:28.855  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : stopped bean '_org.springframework.integration.errorLogger'
2021-09-16 13:55:28.855  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : Removing {message-handler:MySftpConfiguration.handler.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-09-16 13:55:28.855  INFO [] --- [Catalina-utility-2]  o.s.i.channel.PublishSubscribeChannel    : Channel 'application-1.sftpInputChannel' has 1 subscriber(s).
2021-09-16 13:55:28.855  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : stopped bean 'MySftpConfiguration.handler.serviceActivator'
2021-09-16 13:55:28.855  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : Removing {message-handler:MySftpConfiguration.deleteLocalFileService.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-09-16 13:55:28.855  INFO [] --- [Catalina-utility-2]  o.s.i.channel.PublishSubscribeChannel    : Channel 'application-1.sftpInputChannel' has 0 subscriber(s).
2021-09-16 13:55:28.855  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : stopped bean 'MySftpConfiguration.deleteLocalFileService.serviceActivator'
2021-09-16 13:55:28.855  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : Removing {service-activator:MySftpConfiguration.controlBus.serviceActivator} as a subscriber to the 'controlChannel' channel
2021-09-16 13:55:28.856  INFO [] --- [Catalina-utility-2]  o.s.integration.channel.DirectChannel    : Channel 'application-1.controlChannel' has 0 subscriber(s).
2021-09-16 13:55:28.856  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : stopped bean 'MySftpConfiguration.controlBus.serviceActivator'
2021-09-16 13:55:28.857  INFO [] --- [Catalina-utility-2]  o.s.s.c.ThreadPoolTaskScheduler          : Shutting down ExecutorService 'taskScheduler'
2021-09-16 13:55:28.858  INFO [] --- [Catalina-utility-2]  o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'

What am I doing wrong?


Solution

  • Since you say that both tasks starts on the same thread, then it looks like you deal with the latest Spring Boot: https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#features.spring-integration. When Spring Integration now relies on the auto-configured TaskScheduler which comes with one thread in its pool.

    You can change that configuration, or your can add a task-executor to the poller of your sftpInboundAdapter Inbound Channel Adapter definition: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#taskexecutor-support. This way a real job is going to be shifted from a scheduler thread to one provided by that executor.