Search code examples
springspring-bootspring-integrationsftpspring-integration-sftp

Create multiple beans of SftpInboundFileSynchronizingMessageSource dynamically with InboundChannelAdapter


I am using spring inbound channel adapter to poll files from sftp server. Application needs to poll from multiple directories from single sftp server. Since Inbound channel adapter does not allow to poll multiple directories I tried creating multiple beans of same type with different values. Since number of directories can increase in future, I want to control it from application properties and want to register beans dynamically.

My code -

@Override
 public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {

   beanFactory.registerSingleton("sftpSessionFactory", sftpSessionFactory(host, port, user, password));
   beanFactory.registerSingleton("sftpInboundFileSynchronizer",
       sftpInboundFileSynchronizer((SessionFactory) beanFactory.getBean("sftpSessionFactory")));
 }
  public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory(String host, String port, String user, String password) {

    DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
    factory.setHost(host);
    factory.setPort(Integer.parseInt(port));
    factory.setUser(user);
    factory.setPassword(password);
    factory.setAllowUnknownKeys(true);

    return new CachingSessionFactory<>(factory);
  }
  private SftpInboundFileSynchronizer sftpInboundFileSynchronizer(SessionFactory sessionFactory) {

    SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sessionFactory);
    fileSynchronizer.setDeleteRemoteFiles(true);
    fileSynchronizer.setPreserveTimestamp(true);
    fileSynchronizer.setRemoteDirectory("/mydir/subdir);
    fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.pdf"));

    return fileSynchronizer;
  }
  @Bean
  @InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "2000"))
  public MessageSource<File> sftpMessageSource(String s) {
    SftpInboundFileSynchronizingMessageSource source = new SftpInboundFileSynchronizingMessageSource(
        (AbstractInboundFileSynchronizer<ChannelSftp.LsEntry>) applicationContext.getBean("sftpInboundFileSynchronizer"));
    source.setLocalDirectory(new File("/dir/subdir"));
    source.setAutoCreateLocalDirectory(true);
    source.setLocalFilter(new AcceptOnceFileListFilter<>());
    source.setMaxFetchSize(Integer.parseInt(maxFetchSize));
    source.setAutoCreateLocalDirectory(true);

    return source;
  }

  @Bean
  @ServiceActivator(inputChannel = "sftpChannel")
  public MessageHandler handler() {
    return message -> {
      LOGGER.info("Payload - {}", message.getPayload());
    };
  }

This code works fine. But If I create sftpMessageSource dynamically, then @InboundChannelAdapter annotation won't work. Please suggest a way to dynamically create sftpMessageSource and handler beans also and add respective annotations.

Update:

Following Code Worked :

  @PostConstruct
  void init() {
    int index = 0;
    for (String directory : directories) {
      index++;
      int finalI = index;
      IntegrationFlow flow = IntegrationFlows
          .from(Sftp.inboundAdapter(sftpSessionFactory())
                  .preserveTimestamp(true)
                  .remoteDirectory(directory)
                  .autoCreateLocalDirectory(true)
                  .localDirectory(new File("/" + directory))
                  .localFilter(new AcceptOnceFileListFilter<>())
                  .maxFetchSize(10)
                  .filter(new SftpSimplePatternFileListFilter("*.pdf"))
                  .deleteRemoteFiles(true),
              e -> e.id("sftpInboundAdapter" + finalI)
                  .autoStartup(true)
                  .poller(Pollers.fixedDelay(2000)))
          .handle(handler())
          .get();

      this.flowContext.registration(flow).register();
    }
  }
  @Bean
  public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {

    DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
    factory.setHost(host);
    factory.setPort(Integer.parseInt(port));
    factory.setUser(user);
    factory.setPassword(password);
    factory.setAllowUnknownKeys(true);

    return new CachingSessionFactory<>(factory);
  }

Solution

  • Annotations in Java are static. You can't add them at runtime for created objects. Plus the framework reads those annotation on application context startup. So, what you are looking for is just not possible with Java as language per se.

    You need consider to switch to Java DSL in Spring Integration to be able to use its "dynamic flows": https://docs.spring.io/spring-integration/docs/5.3.1.RELEASE/reference/html/dsl.html#java-dsl-runtime-flows.

    But, please, first of all study more what Java can do and what cannot.