My scenario is that I have a Spring Integration Flow with a SftpInboundAdapter which shall get a file from a SftpServer "myHost". The file contains JSON which is to be converted into MyEvent entities to be forwarded to further processing. The process is implemented in a task scheduled by Spring Scheduler. Therefore the Integration Flow is not to start automatically with the Application having that autoStartup(false)
.
The Spring Integration Flow is:
The types TransferContext
, TransferChannel
and MyService
are Java classes of mine with some fields fed from YAML properties which provide values for the sftpSessionFactory
as host, port, user, password and the sftpInboundAdapter
as remoteDirectory, remoteFilename, preserveTimestamp, localDirectory, etc. The Service is to process the MyEvent
entities.
These are my SI beans:
@Configuration
public class MySftpConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(MySftpConfiguration.class);
@Bean
public SessionFactory<LsEntry> testSftpSessionFactory(TransferChannel transferChannel) {
LOG.debug("testSftpSessionFactory");
DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
sf.setHost(transferChannel.getHost());
sf.setPort(transferChannel.getPort());
sf.setUser(transferChannel.getUser());
sf.setPassword(transferChannel.getPassword());
sf.setAllowUnknownKeys(true);
return new CachingSessionFactory<LsEntry>(sf);
}
@Bean
public IntegrationFlow testSftpInboundFlow(TransferContext context) {
LOG.debug("testSftpInboundFlow");
return IntegrationFlows
.from(Sftp.inboundAdapter(testSftpSessionFactory(context.getChannel()))
.preserveTimestamp(context.isPreserveTimestamp())
.remoteDirectory(context.getRemoteDir())
.regexFilter(context.getRemoteFilename())
.deleteRemoteFiles(context.isRemoteRemove())
.autoCreateLocalDirectory(context.isAutoCreateLocalDir())
.localFilenameExpression(context.getLocalFilename())
.localDirectory(new File(context.getLocalDir())),
e -> e.id("sftpInboundAdapter")
.autoStartup(false)
.poller(Pollers.fixedDelay(5000))
)
.transform(Transformers.fromJson(Event[].class))
.channel("sftpInputChannel")
.get();
}
@Bean
public PublishSubscribeChannel sftpInputChannel() {
LOG.debug("sftpInputChannel");
return new PublishSubscribeChannel();
}
@Bean
@ServiceActivator(inputChannel = "sftpInputChannel")
public MessageHandler handler() {
LOG.debug("MessageHandler-handler");
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
LOG.debug("handleMessage message={}", message);
myService().myServiceMethod((myEvent[]) message.getPayload());
}
};
}
@Bean
@ServiceActivator(inputChannel = "sftpInputChannel")
public MessageHandler deleteLocalFileService() {
/* ... */
}
@Bean
public MessageChannel controlChannel() {
LOG.debug("controlChannel");
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "controlChannel")
public ExpressionControlBusFactoryBean controlBus() {
LOG.debug("controlBus");
ExpressionControlBusFactoryBean expressionControlBusFactoryBean = new ExpressionControlBusFactoryBean();
return expressionControlBusFactoryBean;
}
The following code snippet is the task which does all the things, at first starting the SI Integration Flow to get a file from SFTP server and processing its content:
@Scheduled(cron = "${harry.potter.tasks.my-task.frequency}")
public void runAsTask() {
if (env.isEnabled() && myTaskEnv.isEnabled()) {
LOG.info("runAsTask");
jobLOG.info("---------------------------------------------------------------");
jobLOG.info("Job started: Storing Event Data");
} else {
LOG.debug("runAsTask env={}, myTask={}", env.isEnabled(), myTaskEnv.isEnabled());
LOG.debug("runAsTask {} not enabled: aborted.", jobId);
return;
}
try {
LOG.debug("runAsTask globally enabled={}", env.isEnabled());
/* ... */
List<MyEvent> myEvents = null;
try {
myEvents = getFile();
}
catch (Exception e) {
LOG.error("{} failed! Exception {}!!!", jobId, e.getClass().getName());
LOG.debug("... but continuing ...");
}
/* ... createEvents ... */
/* ... cleanData ... */
/* ... storeEvents ... */
/* ... cleanHistoricData ... */
jobLOG.info("Okay, send okay mail");
sendOkayMail();
}
catch (Exception e) {
LOG.error("{} failed! Exception: {}", jobId, e);
jobLOG.info("Job has errors, send error mail");
sendErrorMail();
}
finally {
LOG.info("{} finished", jobId);
jobLOG.info("Job finished");
}
}
The most important section to get the file from SFTP is the following getFile()
method. It sends the start
command to the sftpInboundAdapter
, then initiates a CountDownLatch
with count 1 to wait (a maximum of 20 seconds) for the remote file to be fetched. If the file was fetched within this timeout period, its entities are returned, otherwise ther is nothing to return (null):
EDIT 1:
Add the ChannelInterceptor
before starting the SftpInboundAdapter
:
@Autowired
AbstractMessageChannel sftpInputChannel;
@Autowired
MessageChannel controlChannel;
public List<MyEvent> getFile() {
LOG.debug("getFile");
final List<MyEvent> events = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(1);
sftpInputChannel.addInterceptor(new ChannelInterceptor() {
// capture the message and count down the latch
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
LOG.debug("postSend channel={}, sent={}, message={}",
channel.toString(), sent, message.getPayload());
// read and transform the content
Arrays.stream((MyEvent[]) message.getPayload()).forEach(e -> events.add(e));
// signal: work has done
latch.countDown();
ChannelInterceptor.super.postSend(message, channel, sent);
}
});
boolean sent = controlChannel.send(new GenericMessage<>("@sftpInboundAdapter.start()"));
LOG.debug("getFile 'Start' control message sent successfully={}", sent);
try {
LOG.debug("getFile, waiting for the file to be received, timeout=30s");
if (latch.await(20, TimeUnit.SECONDS)) {
return events;
}
}
catch (InterruptedException e) {
LOG.warn("getFile, job was interrupted");
throw new IllegalStateException("expected file not available");
}
return null;
}
When it runs my expectation was that the SI beans get established in the main thread at first. Then another thread is started by the Spring Scheduler to execute the task. When sending the start message over the control channel, another thread would start to begin negotiating with the SFTP server and transferring the file while the task-thread enters the wait for the countdown latch to reach zero.
The logging shows another scenario. Sending the start command over the control channel has no effect on the number of threads which doing something. So there is no activity while the task thread enters the wait for the latch, but there is no-one to countdown. Consequently the full 20 seconds pass and the getFile()
method returns without data.
As soon as the task-thread has finished, the JSCH module starts negotiating with the SFTP server. The transfer of the file does not work yet but this should be of no interest at the moment.
Here is the logging:
2021-09-16 13:54:34.877 INFO [] --- [http-nio-8080-exec-55] com.harry.potter.Application : The following profiles are active: test
2021-09-16 13:54:37.129 INFO [] --- [http-nio-8080-exec-55] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2021-09-16 13:54:37.418 INFO [] --- [http-nio-8080-exec-55] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService 'taskScheduler'
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.errorChannel' has 1 subscriber(s).
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.endpoint.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger'
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.endpoint.EventDrivenConsumer : Adding {json-to-object-transformer} as a subscriber to the 'testSftpInboundFlow.channel#0' channel
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.integration.channel.DirectChannel : Channel 'application-1.testSftpInboundFlow.channel#0' has 1 subscriber(s).
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.endpoint.EventDrivenConsumer : started bean 'testSftpInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/harry/potter/MySftpConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.endpoint.EventDrivenConsumer : Adding {message-handler:mySftpConfiguration.handler.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.sftpInputChannel' has 1 subscriber(s).
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.endpoint.EventDrivenConsumer : started bean 'mySftpConfiguration.handler.serviceActivator'
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.endpoint.EventDrivenConsumer : Adding {message-handler:mySftpConfiguration.deleteLocalFileService.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.sftpInputChannel' has 2 subscriber(s).
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.endpoint.EventDrivenConsumer : started bean 'mySftpConfiguration.deleteLocalFileService.serviceActivator'
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.endpoint.EventDrivenConsumer : Adding {service-activator:mySftpConfiguration.controlBus.serviceActivator} as a subscriber to the 'controlChannel' channel
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.integration.channel.DirectChannel : Channel 'application-1.controlChannel' has 1 subscriber(s).
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.endpoint.EventDrivenConsumer : started bean 'mySftpConfiguration.controlBus.serviceActivator'
2021-09-16 13:54:37.614 INFO [] --- [http-nio-8080-exec-55] o.s.c.n.eureka.InstanceInfoFactory : Setting initial instance status as: STARTING
2021-09-16 13:54:38.458 INFO [] --- [http-nio-8080-exec-55] c.l.job.MyApplication : Started MyApplication in 4.539 seconds (JVM running for 88524.557)
2021-09-16 13:55:00.000 INFO [] --- [scheduling-1] c.l.c.job.MyEvents Task : runAsTask:
2021-09-16 13:55:00.000 INFO [] --- [scheduling-1] job-MyEvents : ---------------------------------------------------------------
2021-09-16 13:55:00.000 INFO [] --- [scheduling-1] job-MyEvents : Job started:
2021-09-16 13:55:00.000 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : runAsTask globally enabled=true
2021-09-16 13:55:00.000 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : runAsTask enabled=true
2021-09-16 13:55:00.000 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : runAsTask monthsKeepBackwards=12
2021-09-16 13:55:00.000 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : runAsTask monthsUpdateFrameRelStart=2
2021-09-16 13:55:00.000 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : runAsTask monthsUpdateFrameRelEnd=null
2021-09-16 13:55:00.000 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : getFile
2021-09-16 13:55:00.023 INFO [] --- [scheduling-1] o.s.i.e.SourcePollingChannelAdapter : started bean 'sftpInboundAdapter'; defined in: 'class path resource [job/config/MySftpConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-09-16 13:55:00.024 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : getFile 'Start' control message sent successfully=true
2021-09-16 13:55:00.025 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : getFile, waiting for the file to be received, timeout=20s
2021-09-16 13:55:20.026 ERROR [] --- [scheduling-1] c.l.c.job.MyEvents Task : MyEvents failed! Exception java.lang.IllegalStateException!!!
2021-09-16 13:55:20.026 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : ... but continuing ...
2021-09-16 13:55:20.026 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : cleanData
2021-09-16 13:55:20.040 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : cleanData refDateFrom=2021-11-01, refDateTo=null
2021-09-16 13:55:20.042 ERROR [] --- [scheduling-1] c.l.c.job.MyEvents Task : MyEvents failed! Exception: {}
java.lang.NullPointerException: null
at com.harry.potter.job.MyEvents Task.cleanData(MyEvents Task.java:187)
at com.harry.potter.job.MyEvents Task.runAsTask(MyEvents Task.java:99)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:836)
2021-09-16 13:55:20.042 INFO [] --- [scheduling-1] job-MyEvents : Job has errors, send error mail
2021-09-16 13:55:20.042 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : sendErrorMail
2021-09-16 13:55:20.192 INFO [] --- [scheduling-1] c.l.c.job.MyEvents Task : MyEvents finished
2021-09-16 13:55:20.192 INFO [] --- [scheduling-1] job-MyEvents : Job finished
2021-09-16 13:55:20.205 INFO [] --- [scheduling-1] com.jcraft.jsch : Connecting to myHost port 22
2021-09-16 13:55:20.207 INFO [] --- [scheduling-1] com.jcraft.jsch : Connection established
2021-09-16 13:55:20.217 INFO [] --- [scheduling-1] com.jcraft.jsch : Remote version string: SSH-2.0-OpenSSH_6.6.1p1 Debian-4~bpo70+1
2021-09-16 13:55:20.217 INFO [] --- [scheduling-1] com.jcraft.jsch : Local version string: SSH-2.0-JSCH-0.1.54
2021-09-16 13:55:20.217 INFO [] --- [scheduling-1] com.jcraft.jsch : CheckCiphers: aes256-ctr,aes192-ctr,aes128-ctr,aes256-cbc,aes192-cbc,aes128-cbc,3des-ctr,arcfour,arcfour128,arcfour256
2021-09-16 13:55:20.220 INFO [] --- [scheduling-1] com.jcraft.jsch : CheckKexes: diffie-hellman-group14-sha1,ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521
2021-09-16 13:55:20.257 INFO [] --- [scheduling-1] com.jcraft.jsch : CheckSignatures: ecdsa-sha2-nistp256,ecdsa-sha2-nistp384,ecdsa-sha2-nistp521
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : SSH_MSG_KEXINIT sent
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : SSH_MSG_KEXINIT received
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: server: curve25519-sha256@libssh.org,diffie-hellman-group-exchange-sha256,diffie-hellman-group-exchange-sha1,diffie-hellman-group14-sha1,diffie-hellman-group-exchange-sha1,diffie-hellman-group1-sha1
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: server: ssh-rsa,ssh-ed25519
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: server: chacha20-poly1305@openssh.com,aes256-gcm@openssh.com,aes128-gcm@openssh.com,aes256-ctr,aes192-ctr,aes128-ctr,aes128-cbc
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: server: chacha20-poly1305@openssh.com,aes256-gcm@openssh.com,aes128-gcm@openssh.com,aes256-ctr,aes192-ctr,aes128-ctr,aes128-cbc
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: server: hmac-sha2-512-etm@openssh.com,hmac-sha2-256-etm@openssh.com,hmac-ripemd160-etm@openssh.com,umac-128-etm@openssh.com,hmac-sha2-512,hmac-sha2-256,hmac-ripemd160,umac-128@openssh.com,hmac-sha1-96
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: server: hmac-sha2-512-etm@openssh.com,hmac-sha2-256-etm@openssh.com,hmac-ripemd160-etm@openssh.com,umac-128-etm@openssh.com,hmac-sha2-512,hmac-sha2-256,hmac-ripemd160,umac-128@openssh.com,hmac-sha1-96
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: server: none,zlib@openssh.com
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: server: none,zlib@openssh.com
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: server:
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: server:
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: client: ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521,diffie-hellman-group14-sha1,diffie-hellman-group-exchange-sha256,diffie-hellman-group-exchange-sha1,diffie-hellman-group1-sha1
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: client: ssh-rsa,ssh-dss,ecdsa-sha2-nistp256,ecdsa-sha2-nistp384,ecdsa-sha2-nistp521
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: client: aes128-ctr,aes128-cbc,3des-ctr,3des-cbc,blowfish-cbc,aes192-ctr,aes192-cbc,aes256-ctr,aes256-cbc
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: client: aes128-ctr,aes128-cbc,3des-ctr,3des-cbc,blowfish-cbc,aes192-ctr,aes192-cbc,aes256-ctr,aes256-cbc
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: client: hmac-md5,hmac-sha1,hmac-sha2-256,hmac-sha1-96,hmac-md5-96
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: client: hmac-md5,hmac-sha1,hmac-sha2-256,hmac-sha1-96,hmac-md5-96
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: client: none
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: client: none
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: client:
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: client:
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: server->client aes128-ctr hmac-sha2-256 none
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: client->server aes128-ctr hmac-sha2-256 none
2021-09-16 13:55:20.275 INFO [] --- [scheduling-1] com.jcraft.jsch : SSH_MSG_KEXDH_INIT sent
2021-09-16 13:55:20.275 INFO [] --- [scheduling-1] com.jcraft.jsch : expecting SSH_MSG_KEXDH_REPLY
2021-09-16 13:55:20.296 INFO [] --- [scheduling-1] com.jcraft.jsch : ssh_rsa_verify: signature true
2021-09-16 13:55:20.298 INFO [] --- [scheduling-1] o.s.i.s.s.DefaultSftpSessionFactory : The authenticity of host 'myHost ' can't be established.
RSA key fingerprint is 4d:fe:f9:35:08:20:2e:76:76:55:7a:1d:5d:5d:1c:90.
Are you sure you want to continue connecting?
2021-09-16 13:55:20.298 WARN [] --- [scheduling-1] com.jcraft.jsch : Permanently added 'myHost ' (RSA) to the list of known hosts.
2021-09-16 13:55:20.299 INFO [] --- [scheduling-1] com.jcraft.jsch : SSH_MSG_NEWKEYS sent
2021-09-16 13:55:20.299 INFO [] --- [scheduling-1] com.jcraft.jsch : SSH_MSG_NEWKEYS received
2021-09-16 13:55:20.300 INFO [] --- [scheduling-1] com.jcraft.jsch : SSH_MSG_SERVICE_REQUEST sent
2021-09-16 13:55:20.300 INFO [] --- [scheduling-1] com.jcraft.jsch : SSH_MSG_SERVICE_ACCEPT received
2021-09-16 13:55:20.307 INFO [] --- [scheduling-1] com.jcraft.jsch : Authentications that can continue: publickey,keyboard-interactive,password
2021-09-16 13:55:20.307 INFO [] --- [scheduling-1] com.jcraft.jsch : Next authentication method: publickey
2021-09-16 13:55:20.307 INFO [] --- [scheduling-1] com.jcraft.jsch : Authentications that can continue: keyboard-interactive,password
2021-09-16 13:55:20.307 INFO [] --- [scheduling-1] com.jcraft.jsch : Next authentication method: keyboard-interactive
2021-09-16 13:55:20.409 INFO [] --- [scheduling-1] com.jcraft.jsch : Disconnecting from myHost port 22
2021-09-16 13:55:20.419 ERROR [] --- [scheduling-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Problem occurred while synchronizing 'data' to local directory; nested exception is org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is org.springframework.integration.util.PoolItemNotAvailableException: Failed to obtain pooled item
...
Caused by: org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is org.springframework.integration.util.PoolItemNotAvailableException: Failed to obtain pooled item
...
Caused by: org.springframework.integration.util.PoolItemNotAvailableException: Failed to obtain pooled item
...
Caused by: java.lang.IllegalStateException: failed to create SFTP Session
...
Caused by: java.lang.IllegalStateException: failed to connect
...
Caused by: com.jcraft.jsch.JSchException: Auth cancel
...
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.e.SourcePollingChannelAdapter : stopped bean 'sftpInboundAdapter'; defined in: 'class path resource [job/config/MySftpConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : Removing {json-to-object-transformer} as a subscriber to the 'testSftpInboundFlow.channel#0' channel
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.integration.channel.DirectChannel : Channel 'application-1.testSftpInboundFlow.channel#0' has 0 subscriber(s).
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : stopped bean 'testSftpInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [job/config/MySftpConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.errorChannel' has 0 subscriber(s).
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : stopped bean '_org.springframework.integration.errorLogger'
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : Removing {message-handler:MySftpConfiguration.handler.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.sftpInputChannel' has 1 subscriber(s).
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : stopped bean 'MySftpConfiguration.handler.serviceActivator'
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : Removing {message-handler:MySftpConfiguration.deleteLocalFileService.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.sftpInputChannel' has 0 subscriber(s).
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : stopped bean 'MySftpConfiguration.deleteLocalFileService.serviceActivator'
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : Removing {service-activator:MySftpConfiguration.controlBus.serviceActivator} as a subscriber to the 'controlChannel' channel
2021-09-16 13:55:28.856 INFO [] --- [Catalina-utility-2] o.s.integration.channel.DirectChannel : Channel 'application-1.controlChannel' has 0 subscriber(s).
2021-09-16 13:55:28.856 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : stopped bean 'MySftpConfiguration.controlBus.serviceActivator'
2021-09-16 13:55:28.857 INFO [] --- [Catalina-utility-2] o.s.s.c.ThreadPoolTaskScheduler : Shutting down ExecutorService 'taskScheduler'
2021-09-16 13:55:28.858 INFO [] --- [Catalina-utility-2] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'
What am I doing wrong?
Since you say that both tasks starts on the same thread, then it looks like you deal with the latest Spring Boot: https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#features.spring-integration. When Spring Integration now relies on the auto-configured TaskScheduler
which comes with one thread in its pool.
You can change that configuration, or your can add a task-executor
to the poller
of your sftpInboundAdapter
Inbound Channel Adapter definition: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#taskexecutor-support. This way a real job is going to be shifted from a scheduler thread to one provided by that executor.