Search code examples
springspring-integrationspring-integration-sftp

Strategy to refresh/update SessionFactory in spring integration


HI I am using spring integration extensively in my project and in the current case dynamically creating my ftp, sftp adapters using spring dynamic flow registration. Also to provide session-factories I create them dynamically based on persisted configuration for each unique connection .

This works great but sometimes there are situations when I need to modify an existing session config dynamically and in this case I do require the session factory to refresh with a new session config . This can happen due to changing creds dynamically.

To do the same I am looking for two approches

  1. remove the dynamic flows via flowcontext.remove(flowid). But this does not somehow kill the flow, I still see the old session factory and flow running.
  2. If there is a way to associate a running adapter with a new Sessionfactory dynamically this would also work . But still have not find a way to accomplish this .

Please help

UPDATE

my dynamic registration code below

 CachingSessionFactory<FTPFile> csf = cache.get(feed.getConnectionId());
    IntegrationFlow flow = IntegrationFlows
                .from(inboundAdapter(csf).preserveTimestamp(true)//
                      .remoteDirectory(feed.getRemoteDirectory())//
                      .regexFilter(feed.getRegexFilter())//
                      .deleteRemoteFiles(feed.getDeleteRemoteFiles())
                      .autoCreateLocalDirectory(feed.getAutoCreateLocalDirectory())
                      .localFilenameExpression(feed.getLocalFilenameExpression())//
                      .localFilter(localFileFilter)//
                      .localDirectory(new File(feed.getLocalDirectory())),
                      e -> e.id(inboundAdapter.get(feed.getId())).autoStartup(false)
                            .poller(Pollers//
                                  .cron(feed.getPollingFreq())//
                                  .maxMessagesPerPoll(1)//
                                  .advice(retryAdvice)))
                .enrichHeaders(s -> s.header(HEADER.feed.name(), feed))//
                .filter(selector)//
                .handle(fcHandler)//
                .handle(fileValidationHandler)//
                .channel(ftbSubscriber)//
                .get();

          this.flowContext.registration(flow).addBean(csf).//
                id(inboundFlow.get(feed.getId())).//
                autoStartup(false).register();

I am trying removing the same via

flowContext.remove(flowId);

on removing also the poller and adapter still look like they are active

java.lang.IllegalStateException: failed to create FTPClient
at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:275)
at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:200)
at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:62)
at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:134)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:224)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:245)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:58)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:190)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:186)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice.invoke(AbstractRequestHandlerAdvice.java:65)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213)
at com.sun.proxy.$Proxy188.call(Unknown Source)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:353)
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

*POST Gary comments * changed the order of the chain and removing autostartup as defined in his example and now the polling adapter looks like getting removed .

changed order to match the one from Gary and remove autostartup from the flowcontext chain. Though looks like bug is still there if autstrtup is true .

   this.flowContext.registration(flow).//
        id(inboundFlow.get(feed.getId()))//
        .addBean(sessionFactory.get(feed.getId()), csf)//
        .register();

* researching more * The standardIntegrationFlow.start does start all the components inside the flow irrespective of the autostartup status . I guess we do need to check the isAutostartup for these as well and only start them if autostartup is True when starting the IntegrationFlow. existing code below of standardIF . I there a way to override this or does this need a PR or fix .

if (!this.running) {
            ListIterator<Object> iterator = this.integrationComponents.listIterator(this.integrationComponents.size());
            this.lifecycles.clear();
            while (iterator.hasPrevious()) {
                Object component = iterator.previous();
                if (component instanceof SmartLifecycle) {
                    this.lifecycles.add((SmartLifecycle) component);
                    ((SmartLifecycle) component).start();
                }
            }
            this.running = true;
        }

Solution

  • remove() should shut everything down. If you are using CachingSessionFactory we need to destroy() it, so it closes the cached sessions.

    The flow will automatically destroy() the bean if you add it to the registration (using addBean()).

    If you can edit your question to show your dynamic registration code, I can take a look.

    EDIT

    Everything works fine for me...

    @SpringBootApplication
    public class So43916317Application implements CommandLineRunner {
    
        public static void main(String[] args) {
            SpringApplication.run(So43916317Application.class, args).close();
        }
    
        @Autowired
        private IntegrationFlowContext context;
    
        @Override
        public void run(String... args) throws Exception {
            CSF csf = new CSF(sf());
            IntegrationFlow flow = IntegrationFlows.from(Ftp.inboundAdapter(csf)
                        .localDirectory(new File("/tmp/foo"))
                        .remoteDirectory("bar"), e -> e.poller(Pollers.fixedDelay(1_000)))
                    .handle(System.out::println)
                    .get();
            this.context.registration(flow)
                .id("foo")
                .addBean(csf)
                .register();
            Thread.sleep(10_000);
            System.out.println("removing flow");
            this.context.remove("foo");
            System.out.println("destroying csf");
            csf.destroy();
            Thread.sleep(10_000);
            System.out.println("exiting");
            Assert.state(csf.destroyCalled, "destroy not called");
        }
    
        @Bean
        public DefaultFtpSessionFactory sf() {
            DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
            sf.setHost("10.0.0.3");
            sf.setUsername("ftptest");
            sf.setPassword("ftptest");
            return sf;
        }
    
        public static class CSF extends CachingSessionFactory<FTPFile> {
    
            private boolean destroyCalled;
    
            public CSF(SessionFactory<FTPFile> sessionFactory) {
                super(sessionFactory);
            }
    
            @Override
            public void destroy() {
                this.destroyCalled = true;
                super.destroy();
            }
    
        }
    
    }
    

    log...

    16:15:38.898 [task-scheduler-5] DEBUG o.s.i.f.i.FtpInboundFileSynchronizer - 0 files transferred
    16:15:38.898 [task-scheduler-5] DEBUG o.s.i.e.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
    16:15:39.900 [task-scheduler-3] DEBUG o.s.integration.util.SimplePool - Obtained org.springframework.integration.ftp.session.FtpSession@149a806 from pool.
    16:15:39.903 [task-scheduler-3] DEBUG o.s.i.f.r.s.CachingSessionFactory - Releasing Session org.springframework.integration.ftp.session.FtpSession@149a806 back to the pool.
    16:15:39.903 [task-scheduler-3] DEBUG o.s.integration.util.SimplePool - Releasing org.springframework.integration.ftp.session.FtpSession@149a806 back to the pool
    16:15:39.903 [task-scheduler-3] DEBUG o.s.i.f.i.FtpInboundFileSynchronizer - 0 files transferred
    16:15:39.903 [task-scheduler-3] DEBUG o.s.i.e.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
    removing flow
    16:15:40.756 [main] INFO  o.s.i.e.SourcePollingChannelAdapter - stopped org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0
    16:15:40.757 [main] INFO  o.s.i.channel.DirectChannel - Channel 'application.foo.channel#0' has 0 subscriber(s).
    16:15:40.757 [main] INFO  o.s.i.endpoint.EventDrivenConsumer - stopped org.springframework.integration.config.ConsumerEndpointFactoryBean#0
    16:15:40.757 [main] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Retrieved dependent beans for bean 'foo': [org.springframework.integration.ftp.inbound.FtpInboundFileSynchronizer#0, org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0, org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0.source, foo.channel#0, com.example.So43916317Application$$Lambda$12/962287291#0, org.springframework.integration.config.ConsumerEndpointFactoryBean#0, foocom.example.So43916317Application$CSF#0]
    destroying csf
    16:15:40.757 [main] DEBUG o.s.integration.util.SimplePool - Removing org.springframework.integration.ftp.session.FtpSession@149a806 from the pool
    exiting
    16:15:50.761 [main] TRACE o.s.c.a.AnnotationConfigApplicationContext - Publishing event in org.springframework.context.annotation.AnnotationConfigApplicationContext@27c86f2d: org.springframework.boot.context.event.ApplicationReadyEvent[source=org.springframework.boot.SpringApplication@5c18016b]
    

    As you can see, the polling stops after the remove() and the session is closed by the destroy().

    EDIT2

    If you have auto start turned off you have to start via the registration...

    IntegrationFlowRegistration registration = this.context.registration(flow)
        .id("foo")
        .addBean(csf)
        .autoStartup(false)
        .register();
    ...
    registration.start();