Search code examples
spring-bootspring-integrationmessagingspring-integration-dsl

Spring Integration file splitting sample doesn't react to content coming into inbound adapter channel


I'm testing out the following sample setup from SI samples to understand and then adapt it to the similar set of tasks. I won't need mailing functionality, but the rest of it seems adaptable enough to experiment.

DailyCashReportApplication.java

@Slf4j
@SpringBootApplication
@PropertySource(value = "classpath:feeds-config.yml", factory = com.pru.globalpayments.feeds.downstream.YamlPropertySourceFactory.class)
public class DailyCashReportApplication  implements CommandLineRunner  {
    
    
    private final static String EMAIL_SUCCESS_SUFFIX = "emailSuccessSuffix";
    
    // sftp

    @Value("${sftp.in.host}")
    @Getter
    private String host;

    @Value("${sftp.in.port}")
    private int port;

    @Value("${sftp.in.user}")
    private String user;

    @Value("${sftp.in.privateKey}")
    @Getter
    private String privateKeyLocation;

    @Value("${sftp.in.remoteDir}")
    @Getter
    private String remoteDir;


    @Value("${sftp.in.localDir}")
    @Getter
    private String localDir;

    @Value("${sftp.in.chmod}")
    private String chmod;

    @Value("${sftp.in.maxFetchSize}")
    @Getter
    private int maxFetchSize;



    @Value("${sftp.in.file.filter}")
    private String fileFilter;

    @Autowired
    private ResourceLoader resourceLoader;

    
//  @Autowired
//  @Qualifier("syntheticRunner") //TODO: to be replaced by runners consuming from appropriate data sources
//  private AbstractRunner runner; 

    public static void main(String[] args) {
        log.info("DailyCashReportApplication running...");
    
        new SpringApplicationBuilder(DailyCashReportApplication.class).web(WebApplicationType.NONE).run(args);
        
    }

    @Override
    public void run(String... args) throws Exception {
        //runner.run(args);
    }

DcrConfig.java

@Configuration
public class DcrConfig {

    private final static String EMAIL_SUCCESS_SUFFIX = "emailSuccessSuffix";
    
    // sftp

    @Value("${sftp.in.host}")
    @Getter
    private String host;

    @Value("${sftp.in.port}")
    private int port;

    @Value("${sftp.in.user}")
    private String user;

    @Value("${sftp.in.privateKey}")
    @Getter
    private String privateKeyLocation;

    @Value("${sftp.in.remoteDir}")
    @Getter
    private String remoteDir;



    @Value("${sftp.in.localDir}")
    @Getter
    private String localDir;

    @Value("${sftp.in.chmod}")
    private String chmod;

    @Value("${sftp.in.maxFetchSize}")
    @Getter
    private int maxFetchSize;

//  @Value("${sftp.in.poller.fixedDelay}") @Getter
//  private int pollerFixedDelay;

    @Value("${sftp.in.file.filter}")
    private String fileFilter;

    @Autowired
    private ResourceLoader resourceLoader;
    
    @Bean(name = "downloadSftpSessionFactory")
    public SessionFactory<LsEntry> sftpSessionFactory() throws IOException {

        Resource keyFileResource = resourceLoader.getResource(privateKeyLocation);

        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUser(user);
        factory.setPrivateKey(keyFileResource);
        factory.setAllowUnknownKeys(true);
        return new CachingSessionFactory<LsEntry>(factory);
    }

    
    
    @Bean
    public IntegrationFlow fromFile() {
        return IntegrationFlows.from(Files.inboundAdapter(new File(remoteDir))
                                        .preventDuplicates(false)
                                        .patternFilter("*.txt"), 
                                        e -> e.poller(Pollers.fixedDelay(5000).errorChannel("tfrErrors.input"))
                                        .id("fileInboundChannelAdapter"))
                .handle(Files.splitter(true, true))
                .<Object, Class<?>>route(Object::getClass, m->m
                        .channelMapping(FileSplitter.FileMarker.class, "markers.input")
                        .channelMapping(String.class, "lines.input"))
                .get();
    }
    
    @Bean
    public FileWritingMessageHandlerSpec fileOut() {
        return Files.outboundAdapter(localDir)
                .appendNewLine(true)
                .fileNameExpression("payload.substring(1,4) + '.txt'");
    }
    
    @Bean
    public IntegrationFlow lines(FileWritingMessageHandler fileOut) {
        return f -> f.handle(fileOut); 
    }
    

    
    @Bean
    public IntegrationFlow markers() throws IOException{
        return f -> f.<FileSplitter.FileMarker>filter(m -> m.getMark().equals(FileSplitter.FileMarker.Mark.END),
                        e -> e.id("markerFilter"))
                .publishSubscribeChannel(s -> s

                        // first trigger file flushes
                        .subscribe(sf -> sf.transform("'tmp/out/.*\\.txt'", e -> e.id("toTriggerPattern"))
                                .trigger("fileOut", e -> e.id("flusher")))

                        // send the first file
                        .subscribe(sf -> {
                            try {
                                sf.<FileSplitter.FileMarker, File>transform(p -> new File("tmp/out/002.txt"))
                                        .enrichHeaders(h -> h.header(FileHeaders.FILENAME, "002.txt", true))
                                        .handle(Sftp.outboundAdapter(ftp()).remoteDirectory("foo"), e -> e.id("ftp002"));
                            } catch (IOException e1) {
                                // TODO Auto-generated catch block
                                e1.printStackTrace();
                            }
                        })

                        // send the second file
                        .subscribe(sf -> {
                            try {
                                sf.<FileSplitter.FileMarker, File>transform(p -> new File("/tmp/out/006.txt"))
                                        .enrichHeaders(h -> h.header(FileHeaders.FILENAME, "006.txt", true))
                                        .handle(Sftp.outboundAdapter(ftp()).remoteDirectory("foo"), e -> e.id("ftp006"));
                            } catch (IOException e1) {
                                // TODO Auto-generated catch block
                                e1.printStackTrace();
                            }
                        })

                        // send the third file
                        .subscribe(sf -> {
                            try {
                                sf.<FileSplitter.FileMarker, File>transform(p -> new File("/tmp/out/009.txt"))
                                        .enrichHeaders(h -> h.header(FileHeaders.FILENAME, "009.txt", true))
                                        .handle(Sftp.outboundAdapter(ftp()).remoteDirectory("foo"), e -> e.id("ftp009"));
                            } catch (IOException e1) {
                                // TODO Auto-generated catch block
                                e1.printStackTrace();
                            }
                        })


                        );
    }
    
    @Bean
    public IntegrationFlow tfrErrors() {
        return f -> f
//              .enrichHeaders(Mail.headers()
//                      .subject("File split and transfer failed")
//                      .from("foo@bar")
//                      .toFunction(m -> new String[] { "bar@baz"} ))
//              .enrichHeaders(h -> h.header(EMAIL_SUCCESS_SUFFIX, ".failed")
//                  .headerExpression(FileHeaders.ORIGINAL_FILE, "payload.failedMessage.headers['"
//                          + FileHeaders.ORIGINAL_FILE + "']"))
//              .<MessagingException, String>transform(p->p.getFailedMessage().getPayload()+"\n"+getStackTraceAsString(p))
                .channel("toMail.input");
    }
    
    
    private String getStackTraceAsString(Throwable cause) {
        StringWriter stringWriter = new StringWriter();
        PrintWriter printWriter = new PrintWriter(stringWriter, true);
        cause.printStackTrace(printWriter);
        return stringWriter.getBuffer().toString();
    }
    

    
    @Bean
    public SessionFactory<LsEntry> ftp() throws IOException {

        Resource keyFileResource = resourceLoader.getResource(privateKeyLocation);

//      File keyFile = keyFileResource.getFile();
//      try (BufferedReader br = new BufferedReader(new FileReader(keyFile))) {
//             String line;
//             while ((line = br.readLine()) != null) {
//                 System.out.println(line);
//             }
//          }
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUser(user);
        factory.setPrivateKey(keyFileResource);
        factory.setAllowUnknownKeys(true);
        return new CachingSessionFactory<LsEntry>(factory);
    }
    
    @Bean
    public MethodInterceptor afterMailAdvice() {
        return invocation -> {
            Message<?> message = (Message<?>) invocation.getArguments()[0];
            MessageHeaders headers = message.getHeaders();
            File originalFile = headers.get(FileHeaders.ORIGINAL_FILE, File.class);
            try {
                invocation.proceed();
                originalFile.renameTo(new File(originalFile.getAbsolutePath() + headers.get(EMAIL_SUCCESS_SUFFIX)));
            }
            catch(Exception e) {
                originalFile.renameTo(new File(originalFile.getAbsolutePath() + headers.get(EMAIL_SUCCESS_SUFFIX)+"email.failed"));
            }
            return null;
        };
    }
    
}

application-dev.yml

spring:
  application:
    name: daily-cash-report-application

sftp:
  in:
    host: eafdev
    port: 22
    user: eafdev
    remoteDir: tmp/in 
    tmpDir: 
    localDir: tmp/out 
    maxFetchSize: 1    
    privateKey: file:///C:/Users/x12345/AppData/Roaming/SSH/UserKeys/distributessh
    chmod: 664 
    poller:
      fixedDelay: 10000
    file:
      filter: A.B.*    #TODO: calibrate once spec is known
  out:
    host: eafdev
    port: 22
    user: eafdev
    remoteDir: tmp/out  
    privateKey: file:///C:/Users/x12345/AppData/Roaming/SSH/UserKeys/distributessh
    chmod: 664 
  
file: 
  out:   
    targetDir: 
    tmpDir: 

DailyCashReportApplicationTest.java

@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@SpringIntegrationTest(noAutoStartup = "fileInboundChannelAdapter")
@SpringJUnitConfig(classes = { 
        DailyCashReportApplication.class,
        DcrConfig.class,
        })
@EnableConfigurationProperties
@PropertySource(value = "application-dev.yml", factory = YamlPropertySourceFactory.class)
@Slf4j
class DailyCashReportApplicationTest {

    @Test
    @SneakyThrows
    void test()  {
        
        File in = new File("tmp/in/", "foo");
        FileOutputStream fos = new FileOutputStream(in);
        fos.write("*002,foo,bar\n*006,baz,qux\n*009,fiz,buz\n".getBytes());
        fos.close();
        in.renameTo(new File("tmp/in/", "foo.txt"));
        
        File out = new File("tmp/out/002.txt");
        int n = 0;
        while (n++ < 100 && (!out.exists() || out.length() < 12)) {
            Thread.sleep(100);
        }
        assertThat(out.exists()).isTrue();
        
    }

}

The above test fails as it seems that the messages being put into tmp/in directory do not get picked up by the inbound file channel adapter and hence the tmp/out folder is empty.

What is missing? Is there a way to attach some logging abilities to see what's going on in the messaging infrastructure more clearly?

Perhaps the following line in the test is the culprit that disables the needed behavior:

@SpringIntegrationTest(noAutoStartup = "fileInboundChannelAdapter")

Not sure why it's there but commenting it out results in the following error:

Caused by: org.springframework.expression.spel.SpelEvaluationException: EL1008E: Property or field 'tmp' cannot be found on object of type 'org.springframework.messaging.support.GenericMessage' - maybe not public or not valid?
    at org.springframework.expression.spel.ast.PropertyOrFieldReference.readProperty(PropertyOrFieldReference.java:223)

Update: Looks like those ticks in the SpEL expresseion matter, changing to the following seems to make it send the files through the pipeline:

    @Bean
    public FileWritingMessageHandlerSpec fileOut() {
        return Files.outboundAdapter("'"+localDir+"'")
                .appendNewLine(true)
                .fileNameExpression("payload.substring(1,4) + '.txt'");
    }

Also, what's the interpretation of the following expression in the transformer:

"'tmp/out/.*\\.txt'" 

Sounds like a wildcard for all text files in that specific directory but why does one need all these extra dots and slashes there? Is it described in detail in some documentation?

Thanks.


Solution

  • See Javadoc of that factory:

    /**
     * Create a {@link FileWritingMessageHandlerSpec} builder for the one-way {@code FileWritingMessageHandler}.
     * @param directoryExpression the SpEL expression to evaluate target directory for writing files.
     * @return the {@link FileWritingMessageHandlerSpec} instance.
     */
    public static FileWritingMessageHandlerSpec outboundAdapter(String directoryExpression) {
    

    So, that argument is parsed as a SpEL expression, and since you rely on a literal for directory name, therefore it has to be quoted.

    That part of code you are talking about:

                        // first trigger file flushes
                        .subscribe(sf -> sf.transform("'/tmp/out/.*\\.txt'", e -> e.id("toTriggerPattern"))
                                .trigger("fileOut", e -> e.id("flusher")))
    

    is for a MessageTriggerAction implemented on the FileWritingMessageHandler. Also see trigger() and FileWritingMessageHandler Javadocs an docs: https://docs.spring.io/spring-integration/reference/file/writing.html#file-flushing

    Essentially we need that pattern from a transformer for the DefaultFlushPredicate in the FileWritingMessageHandler.