Search code examples
javaspringspring-integrationspring-integration-dslspring-integration-sftp

Configured a SFTP inbound channel to get a file from a SFTP server but my test case does not bring it alive


I want to get a file from a SFTP server, transform the JSON content into an array of instances of one of my classes. Therfore I have thought to use Spring Integration within a Spring Boot application and would be happy to have one of the IntegrationFlows fluently programmed to achieve that.

I dived into many articles, questions and answers here in SO so far, but nothing really helped. I am struggling with it since weeks. The Spring Integration documentation is itself hard to understand. It comes with a whole lot of different aspects depending of Spring versions and different programming paradigmas (XML, Java Configuration, and Java DSL), which brings more complexity into the matter, makes it even harder to decide, which of the rare examples to follow, and is eventually no help for a newbee. A newbee wants to find his problem and wants being recommended and guided to the currently best way to solve his problem, being explained the whys and odds and evens. If he is happy enough, he will be able to find his problem in one of the examples (getting a file from SFTP to further process it, is not an exotic task) and the solution is quite in an example, he can copy&paste with minimal adaptations. I was not so lucky until now.

One question here in SO was near to what I probably need: there was an sftpInboundFileSynchonizer configured, together with a MessageSource and a MassageHandler. The asking person wanted to do roughly the same as I. The answer was "That code is doing the reverse (sending to SFTP)." which left me flabbergasted and thinking, did I mixed up basically the understanding of inbound and outbound?

I think I have to use an InboundAdapter based on a SftpSessionFactory. I put the SessionFactory into an extra configuration because I plan to use it with other adapters as well:

@Configuration
@ConfigurationProperties("test.job.common")
public class JobCommonConfiguration {
    private static final Logger LOG = LoggerFactory.getLogger(JobCommonConfiguration.class);

    private String hostname;
    private int port;
    private String username;
    private String password;

    @Bean
    public SessionFactory<LsEntry> sftpTest3SessionFactory() {
        DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
        sf.setHost(hostname);
        sf.setPort(port);
        sf.setUser(username);
        sf.setPassword(password);
//      factory.setTestSession(true);
        return new CachingSessionFactory<LsEntry>(sf);
    }

    /* getters & setters */

}

The second configuration is to configure an SftpInboundAdapter, assuming my understanding of "inbound" is correct, which comes with a SI Transformer to convert JSON into an array of my Event instances. In the end the instances should be send by request to an HTTP REST service, which I could as well include into the adapter? It is as follows:

@Configuration
@ConfigurationProperties("test.job.transfer3")
@Import({ JobCommonConfiguration.class })
public class Job3Configuration {
    private static final Logger LOG = LoggerFactory.getLogger(Job3Configuration.class);

    private boolean enabled = false;
    private String remoteDir;
    private String remoteFile;
    private String remoteFilePattern;
    private boolean remoteRemove;
    private String localDir;
    private String localFile;

    @Autowired
    private SessionFactory<LsEntry> sftpTest3SessionFactory;

    @Bean
    public FireOnceTrigger fireOnceTest3Trigger() {
        return new FireOnceTrigger();
    }

    @Bean
    public IntegrationFlow test3SftpInboundFlow() {
        return IntegrationFlows
            .from(Sftp.inboundAdapter(sftpTest3SessionFactory)
                .preserveTimestamp(true)
                .remoteDirectory(remoteDir)
                .regexFilter(remoteFilePattern)
                .localFilenameExpression(localFile)
                .localDirectory(new File(localDir)),
            e -> e.id("sftpTest3InboundAdapter")
                .autoStartup(true)
                .poller(Pollers.trigger(fireOnceTest3Trigger()))
            )
            .transform(Transformers.fromJson(Event[].class))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
    }

    /* getters & setters */
}

My entity Event is quite simple:

public class Event {

    private Integer crmId;
    private String eventType;
    private LocalDateTime dateFrom;
    private LocalDateTime dateTo;

    /* getters & setter & toString() */
}

My Test is not testing anything yet, because nothing happened so far. It then should assert that I received the correct number of Events. It looks as following:

@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes=SftpTransferTestApplication.class)
@Import(Job3Configuration.class)
public class GetFileIntegrationTest {
    private static final Logger LOG = LoggerFactory.getLogger(GetFileIntegrationTest.class);

    @Test
    public void testGetFile() throws Exception {
        LOG.info("GetIntegrationTest testgetFile");

//      assertThat(fileReceived, is(sb.toString()));
    }
}

The Test application is straight forward:

@SpringBootApplication
@EnableIntegration
@IntegrationComponentScan
public class SftpTransferTestApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpTransferTestApplication.class).web(
            NONE).run(args);
    }
}

My project has as parent spring-boot-starter-parent with version 2.3.0.RELEASE and uses spring-integration-sftp with version 5.3.0.RELEASE.

Please help me to bring this test case alive. What do I wrong?

How can I include the Logger features into the IntegrationFlow to see more of what is (or is not) happening?

EDIT 1

I tried to strip my code a bit to avoid configuration proplems:

@Configuration
public class JobConfiguration {
    private static final Logger LOG = LoggerFactory.getLogger(JobConfiguration.class);
    @Bean
    public TransferChannel getTransferChannel() {
        TransferChannel channel = new TransferChannel();
        channel.setHost("myHost");
        channel.setPort(0);
        channel.setUser("test");
        channel.setPassword("xxx");
        return channel;
    }

    @Bean
    public TransferContext getTransferContext() {
        TransferContext context = new TransferContext();
        context.setEnabled(false);
        context.setChannel(getTransferChannel());
        context.setRemoteDir("data");
        context.setRemoteFilename("GetMessage3.json");
        context.setRemoteFilenameFilter("GetMessage\\.json$");
        context.setLocalDir("sftp-inbound");
        context.setLocalFile("GetMessage3.json");
        return context;
    }

    @Bean
    public SessionFactory<LsEntry> getSftpTestSessionFactory(TransferChannel transferChannel) {
        DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
        sf.setHost(transferChannel.getHost());
        sf.setPort(transferChannel.getPort());
        sf.setUser(transferChannel.getUser());
        sf.setPassword(transferChannel.getPassword());
//      factory.setTestSession(true);
        return new CachingSessionFactory<LsEntry>(sf);
    }

    @Bean
    public FireOnceTrigger fireOnceTestTrigger() {
        return new FireOnceTrigger();
    }

    @Bean
    public IntegrationFlow testSftpInboundFlow(TransferContext context) {
        return IntegrationFlows
            .from(Sftp.inboundAdapter(getSftpTestSessionFactory(context.getChannel()))
                    .preserveTimestamp(true)
                    .remoteDirectory(context.getRemoteDir())
                    .regexFilter(context.getRemoteFilenameFilter())
                    .localFilenameExpression(context.getLocalFile())
                    .localDirectory(new File(context.getLocalDir())),
                e -> e.id("sftpTestInboundAdapter")
                    .autoStartup(true)
                    .poller(Pollers.trigger(fireOnceTestTrigger()))
//                  .poller(Pollers.fixedDelay(5000))
                )
            .transform(Transformers.fromJson(Event[].class))
            .channel("sftpTestMessageChannel")
//          .logAndReply(Level.DEBUG);
//          .handle(m -> System.out.println("myHandler: " + m.getPayload().toString()))
            .get();
    }

    @Bean("someService.handler")             
    @EndpointId("someService")               
    @ServiceActivator(inputChannel = "sftpTestMessageChannel")
    public MessageHandler someHandler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println("myThirdHandler: " + message.getPayload());
                System.out.println("myThirdHandler: " + message.getHeaders());
                Event ev = ((Event[]) message.getPayload())[1];
                System.out.println("myThirdHandler: " + ev);
                throw new IllegalStateException("Want to see the next MessageHandler");
            }
        };
    }
}

To let the ftp implementation Jsch speak a bit more, I have a configuration "application-junit.yaml":

logging:
  level:
    org.springframework.integration: debug
    ### filter warning
    ### org.springframework.integration.expression.ExpressionUtils:
    ### Creating EvaluationContext with no beanFactory
    org.springframework.integration.expression.ExpressionUtils: error
    ### filter info
    ###    because jsch is very talkative
    com.jcraft.jsch: debug
    com.harry.potter: debug
    com.harry.potter.filetransfer: trace

I can't get it to work. Logging output is:

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.3.0.RELEASE)

2021-08-22 23:29:27.958  INFO 9916 --- [           main] c.l.c.f.service.GetFileIntegrationTest   : Starting GetFileIntegrationTest on myClient with PID 9916 (started by test in C:\Users\test\git\tis\sftp-client)
2021-08-22 23:29:27.959 DEBUG 9916 --- [           main] c.l.c.f.service.GetFileIntegrationTest   : Running with Spring Boot v2.3.0.RELEASE, Spring v5.2.6.RELEASE
2021-08-22 23:29:27.959  INFO 9916 --- [           main] c.l.c.f.service.GetFileIntegrationTest   : No active profile set, falling back to default profiles: default
2021-08-22 23:29:28.752  INFO 9916 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-08-22 23:29:28.759  INFO 9916 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-08-22 23:29:28.763 DEBUG 9916 --- [           main] faultConfiguringBeanFactoryPostProcessor : SpEL function '#xpath' isn't registered: there is no spring-integration-xml.jar on the classpath.
2021-08-22 23:29:28.766  INFO 9916 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-08-22 23:29:28.837  INFO 9916 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-22 23:29:28.841  INFO 9916 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-22 23:29:28.873  INFO 9916 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-22 23:29:30.298  INFO 9916 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2021-08-22 23:29:30.802  INFO 9916 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService 'taskScheduler'
2021-08-22 23:29:30.894 DEBUG 9916 --- [           main] faultConfiguringBeanFactoryPostProcessor : 
Spring Integration global properties:
    
    spring.integration.endpoints.noAutoStartup=
    spring.integration.taskScheduler.poolSize=10
    spring.integration.channels.maxUnicastSubscribers=0x7fffffff
    spring.integration.channels.autoCreate=true
    spring.integration.channels.maxBroadcastSubscribers=0x7fffffff
    spring.integration.readOnly.headers=
    spring.integration.messagingTemplate.throwExceptionOnLateReply=false
  
2021-08-22 23:29:30.901 DEBUG 9916 --- [           main] .s.i.c.GlobalChannelInterceptorProcessor : No global channel interceptors.
2021-08-22 23:29:30.904  INFO 9916 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-08-22 23:29:30.904  INFO 9916 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2021-08-22 23:29:30.904  INFO 9916 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2021-08-22 23:29:30.905  INFO 9916 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {json-to-object-transformer} as a subscriber to the 'testSftpInboundFlow.channel#0' channel
2021-08-22 23:29:30.905  INFO 9916 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'application.testSftpInboundFlow.channel#0' has 1 subscriber(s).
2021-08-22 23:29:30.905  INFO 9916 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean 'testSftpInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/harry/potter/filetransfer/config/JobConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-08-22 23:29:30.905  INFO 9916 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {message-handler:someService} as a subscriber to the 'sftpTestMessageChannel' channel
2021-08-22 23:29:30.905  INFO 9916 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'application.sftpTestMessageChannel' has 1 subscriber(s).
2021-08-22 23:29:30.905  INFO 9916 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean 'someService'
2021-08-22 23:29:30.910  INFO 9916 --- [           main] o.s.i.e.SourcePollingChannelAdapter      : started bean 'sftpTestInboundAdapter'; defined in: 'class path resource     [com/harry/potter/filetransfer/config/JobConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-08-22 23:29:30.922  INFO 9916 --- [           main] c.l.c.f.service.GetFileIntegrationTest   : Started GetFileIntegrationTest in 3.323 seconds (JVM running for 4.13)
2021-08-22 23:29:30.935  INFO 9916 --- [ask-scheduler-1] com.jcraft.jsch                          : Connecting to myHost port 22
2021-08-22 23:29:30.968  INFO 9916 --- [ask-scheduler-1] com.jcraft.jsch                          : Connection established
2021-08-22 23:29:31.041  INFO 9916 --- [ask-scheduler-1] com.jcraft.jsch                          : Remote version string: SSH-2.0-OpenSSH_for_Windows_8.1
2021-08-22 23:29:31.042  INFO 9916 --- [ask-scheduler-1] com.jcraft.jsch                          : Local version string: SSH-2.0-JSCH-0.1.54
2021-08-22 23:29:31.042  INFO 9916 --- [ask-scheduler-1] com.jcraft.jsch                          : CheckCiphers: aes256-ctr,aes192-ctr,aes128-ctr,aes256-cbc,aes192-cbc,aes128-cbc,3des-ctr,arcfour,arcfour128,arcfour256
2021-08-22 23:29:31.216  INFO 9916 --- [           main] c.l.c.f.service.GetFileIntegrationTest   : GetIntegrationTest testgetFile
2021-08-22 23:29:31.226  INFO 9916 --- [extShutdownHook] o.s.i.e.SourcePollingChannelAdapter      : stopped bean 'sftpTestInboundAdapter'; defined in: 'class path resource [com/harry/potter/filetransfer/config/JobConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-08-22 23:29:31.227  INFO 9916 --- [extShutdownHook] o.s.i.endpoint.EventDrivenConsumer       : Removing {json-to-object-transformer} as a subscriber to the 'testSftpInboundFlow.channel#0' channel
2021-08-22 23:29:31.227  INFO 9916 --- [extShutdownHook] o.s.integration.channel.DirectChannel    : Channel 'application.testSftpInboundFlow.channel#0' has 0 subscriber(s).
2021-08-22 23:29:31.227  INFO 9916 --- [extShutdownHook] o.s.i.endpoint.EventDrivenConsumer       : stopped bean 'testSftpInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/harry/potter/filetransfer/config/JobConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-08-22 23:29:31.227  INFO 9916 --- [extShutdownHook] o.s.i.endpoint.EventDrivenConsumer       : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-08-22 23:29:31.227  INFO 9916 --- [extShutdownHook] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 0 subscriber(s).
2021-08-22 23:29:31.227  INFO 9916 --- [extShutdownHook] o.s.i.endpoint.EventDrivenConsumer       : stopped bean '_org.springframework.integration.errorLogger'
2021-08-22 23:29:31.227  INFO 9916 --- [extShutdownHook] o.s.i.endpoint.EventDrivenConsumer       : Removing {message-handler:someService} as a subscriber to the 'sftpTestMessageChannel' channel
2021-08-22 23:29:31.227  INFO 9916 --- [extShutdownHook] o.s.integration.channel.DirectChannel    : Channel 'application.sftpTestMessageChannel' has 0 subscriber(s).
2021-08-22 23:29:31.227  INFO 9916 --- [extShutdownHook] o.s.i.endpoint.EventDrivenConsumer       : stopped bean 'someService'
2021-08-22 23:29:31.227  INFO 9916 --- [extShutdownHook] o.s.s.c.ThreadPoolTaskScheduler          : Shutting down ExecutorService 'taskScheduler'
2021-08-22 23:29:31.228  INFO 9916 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'

I replaced the FireOnceTestTrigger with Pollers.fixedDelay(5000), nothing.

What do I wrong?


Solution

  • The test is exiting and the JVM shut down before the fetch is completed.

    You need to wait for the file to be fetched. One way would be to add a channel interceptor in your test. Something like this:

    public class GetFileIntegrationTest {
        private static final Logger LOG = LoggerFactory.getLogger(GetFileIntegrationTest.class);
    
        @Autowired
        AbstractMessageChannel sftpTestMessageChannel;
    
        @Test
        public void testGetFile() throws Exception {
            LOG.info("GetIntegrationTest testgetFile");
            CountDownLatch latch = new CountDownLatch(1);
            this.sftpTestMessageChannel.addInterceptor(new ChannelInterceptor() {
                            
                 // override preSend and/or postSend, capture the message and 
                 // count down the latch.
            });
    
            assertTrue(latch.await(10, TimeUnit.SECONDS));
    //      assertThat(fileReceived, is(sb.toString()));
        }
    }