I'm testing out the following sample setup from SI samples to understand and then adapt it to the similar set of tasks. I won't need mailing functionality, but the rest of it seems adaptable enough to experiment.
DailyCashReportApplication.java
@Slf4j
@SpringBootApplication
@PropertySource(value = "classpath:feeds-config.yml", factory = com.pru.globalpayments.feeds.downstream.YamlPropertySourceFactory.class)
public class DailyCashReportApplication implements CommandLineRunner {
private final static String EMAIL_SUCCESS_SUFFIX = "emailSuccessSuffix";
// sftp
@Value("${sftp.in.host}")
@Getter
private String host;
@Value("${sftp.in.port}")
private int port;
@Value("${sftp.in.user}")
private String user;
@Value("${sftp.in.privateKey}")
@Getter
private String privateKeyLocation;
@Value("${sftp.in.remoteDir}")
@Getter
private String remoteDir;
@Value("${sftp.in.localDir}")
@Getter
private String localDir;
@Value("${sftp.in.chmod}")
private String chmod;
@Value("${sftp.in.maxFetchSize}")
@Getter
private int maxFetchSize;
@Value("${sftp.in.file.filter}")
private String fileFilter;
@Autowired
private ResourceLoader resourceLoader;
// @Autowired
// @Qualifier("syntheticRunner") //TODO: to be replaced by runners consuming from appropriate data sources
// private AbstractRunner runner;
public static void main(String[] args) {
log.info("DailyCashReportApplication running...");
new SpringApplicationBuilder(DailyCashReportApplication.class).web(WebApplicationType.NONE).run(args);
}
@Override
public void run(String... args) throws Exception {
//runner.run(args);
}
DcrConfig.java
@Configuration
public class DcrConfig {
private final static String EMAIL_SUCCESS_SUFFIX = "emailSuccessSuffix";
// sftp
@Value("${sftp.in.host}")
@Getter
private String host;
@Value("${sftp.in.port}")
private int port;
@Value("${sftp.in.user}")
private String user;
@Value("${sftp.in.privateKey}")
@Getter
private String privateKeyLocation;
@Value("${sftp.in.remoteDir}")
@Getter
private String remoteDir;
@Value("${sftp.in.localDir}")
@Getter
private String localDir;
@Value("${sftp.in.chmod}")
private String chmod;
@Value("${sftp.in.maxFetchSize}")
@Getter
private int maxFetchSize;
// @Value("${sftp.in.poller.fixedDelay}") @Getter
// private int pollerFixedDelay;
@Value("${sftp.in.file.filter}")
private String fileFilter;
@Autowired
private ResourceLoader resourceLoader;
@Bean(name = "downloadSftpSessionFactory")
public SessionFactory<LsEntry> sftpSessionFactory() throws IOException {
Resource keyFileResource = resourceLoader.getResource(privateKeyLocation);
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUser(user);
factory.setPrivateKey(keyFileResource);
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<LsEntry>(factory);
}
@Bean
public IntegrationFlow fromFile() {
return IntegrationFlows.from(Files.inboundAdapter(new File(remoteDir))
.preventDuplicates(false)
.patternFilter("*.txt"),
e -> e.poller(Pollers.fixedDelay(5000).errorChannel("tfrErrors.input"))
.id("fileInboundChannelAdapter"))
.handle(Files.splitter(true, true))
.<Object, Class<?>>route(Object::getClass, m->m
.channelMapping(FileSplitter.FileMarker.class, "markers.input")
.channelMapping(String.class, "lines.input"))
.get();
}
@Bean
public FileWritingMessageHandlerSpec fileOut() {
return Files.outboundAdapter(localDir)
.appendNewLine(true)
.fileNameExpression("payload.substring(1,4) + '.txt'");
}
@Bean
public IntegrationFlow lines(FileWritingMessageHandler fileOut) {
return f -> f.handle(fileOut);
}
@Bean
public IntegrationFlow markers() throws IOException{
return f -> f.<FileSplitter.FileMarker>filter(m -> m.getMark().equals(FileSplitter.FileMarker.Mark.END),
e -> e.id("markerFilter"))
.publishSubscribeChannel(s -> s
// first trigger file flushes
.subscribe(sf -> sf.transform("'tmp/out/.*\\.txt'", e -> e.id("toTriggerPattern"))
.trigger("fileOut", e -> e.id("flusher")))
// send the first file
.subscribe(sf -> {
try {
sf.<FileSplitter.FileMarker, File>transform(p -> new File("tmp/out/002.txt"))
.enrichHeaders(h -> h.header(FileHeaders.FILENAME, "002.txt", true))
.handle(Sftp.outboundAdapter(ftp()).remoteDirectory("foo"), e -> e.id("ftp002"));
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
})
// send the second file
.subscribe(sf -> {
try {
sf.<FileSplitter.FileMarker, File>transform(p -> new File("/tmp/out/006.txt"))
.enrichHeaders(h -> h.header(FileHeaders.FILENAME, "006.txt", true))
.handle(Sftp.outboundAdapter(ftp()).remoteDirectory("foo"), e -> e.id("ftp006"));
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
})
// send the third file
.subscribe(sf -> {
try {
sf.<FileSplitter.FileMarker, File>transform(p -> new File("/tmp/out/009.txt"))
.enrichHeaders(h -> h.header(FileHeaders.FILENAME, "009.txt", true))
.handle(Sftp.outboundAdapter(ftp()).remoteDirectory("foo"), e -> e.id("ftp009"));
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
})
);
}
@Bean
public IntegrationFlow tfrErrors() {
return f -> f
// .enrichHeaders(Mail.headers()
// .subject("File split and transfer failed")
// .from("foo@bar")
// .toFunction(m -> new String[] { "bar@baz"} ))
// .enrichHeaders(h -> h.header(EMAIL_SUCCESS_SUFFIX, ".failed")
// .headerExpression(FileHeaders.ORIGINAL_FILE, "payload.failedMessage.headers['"
// + FileHeaders.ORIGINAL_FILE + "']"))
// .<MessagingException, String>transform(p->p.getFailedMessage().getPayload()+"\n"+getStackTraceAsString(p))
.channel("toMail.input");
}
private String getStackTraceAsString(Throwable cause) {
StringWriter stringWriter = new StringWriter();
PrintWriter printWriter = new PrintWriter(stringWriter, true);
cause.printStackTrace(printWriter);
return stringWriter.getBuffer().toString();
}
@Bean
public SessionFactory<LsEntry> ftp() throws IOException {
Resource keyFileResource = resourceLoader.getResource(privateKeyLocation);
// File keyFile = keyFileResource.getFile();
// try (BufferedReader br = new BufferedReader(new FileReader(keyFile))) {
// String line;
// while ((line = br.readLine()) != null) {
// System.out.println(line);
// }
// }
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUser(user);
factory.setPrivateKey(keyFileResource);
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<LsEntry>(factory);
}
@Bean
public MethodInterceptor afterMailAdvice() {
return invocation -> {
Message<?> message = (Message<?>) invocation.getArguments()[0];
MessageHeaders headers = message.getHeaders();
File originalFile = headers.get(FileHeaders.ORIGINAL_FILE, File.class);
try {
invocation.proceed();
originalFile.renameTo(new File(originalFile.getAbsolutePath() + headers.get(EMAIL_SUCCESS_SUFFIX)));
}
catch(Exception e) {
originalFile.renameTo(new File(originalFile.getAbsolutePath() + headers.get(EMAIL_SUCCESS_SUFFIX)+"email.failed"));
}
return null;
};
}
}
application-dev.yml
spring:
application:
name: daily-cash-report-application
sftp:
in:
host: eafdev
port: 22
user: eafdev
remoteDir: tmp/in
tmpDir:
localDir: tmp/out
maxFetchSize: 1
privateKey: file:///C:/Users/x12345/AppData/Roaming/SSH/UserKeys/distributessh
chmod: 664
poller:
fixedDelay: 10000
file:
filter: A.B.* #TODO: calibrate once spec is known
out:
host: eafdev
port: 22
user: eafdev
remoteDir: tmp/out
privateKey: file:///C:/Users/x12345/AppData/Roaming/SSH/UserKeys/distributessh
chmod: 664
file:
out:
targetDir:
tmpDir:
DailyCashReportApplicationTest.java
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@SpringIntegrationTest(noAutoStartup = "fileInboundChannelAdapter")
@SpringJUnitConfig(classes = {
DailyCashReportApplication.class,
DcrConfig.class,
})
@EnableConfigurationProperties
@PropertySource(value = "application-dev.yml", factory = YamlPropertySourceFactory.class)
@Slf4j
class DailyCashReportApplicationTest {
@Test
@SneakyThrows
void test() {
File in = new File("tmp/in/", "foo");
FileOutputStream fos = new FileOutputStream(in);
fos.write("*002,foo,bar\n*006,baz,qux\n*009,fiz,buz\n".getBytes());
fos.close();
in.renameTo(new File("tmp/in/", "foo.txt"));
File out = new File("tmp/out/002.txt");
int n = 0;
while (n++ < 100 && (!out.exists() || out.length() < 12)) {
Thread.sleep(100);
}
assertThat(out.exists()).isTrue();
}
}
The above test fails as it seems that the messages being put into tmp/in
directory do not get picked up by the inbound file channel adapter and hence the tmp/out
folder is empty.
What is missing? Is there a way to attach some logging abilities to see what's going on in the messaging infrastructure more clearly?
Perhaps the following line in the test is the culprit that disables the needed behavior:
@SpringIntegrationTest(noAutoStartup = "fileInboundChannelAdapter")
Not sure why it's there but commenting it out results in the following error:
Caused by: org.springframework.expression.spel.SpelEvaluationException: EL1008E: Property or field 'tmp' cannot be found on object of type 'org.springframework.messaging.support.GenericMessage' - maybe not public or not valid?
at org.springframework.expression.spel.ast.PropertyOrFieldReference.readProperty(PropertyOrFieldReference.java:223)
Update: Looks like those ticks in the SpEL expresseion matter, changing to the following seems to make it send the files through the pipeline:
@Bean
public FileWritingMessageHandlerSpec fileOut() {
return Files.outboundAdapter("'"+localDir+"'")
.appendNewLine(true)
.fileNameExpression("payload.substring(1,4) + '.txt'");
}
Also, what's the interpretation of the following expression in the transformer:
"'tmp/out/.*\\.txt'"
Sounds like a wildcard for all text files in that specific directory but why does one need all these extra dots and slashes there? Is it described in detail in some documentation?
Thanks.
See Javadoc of that factory:
/**
* Create a {@link FileWritingMessageHandlerSpec} builder for the one-way {@code FileWritingMessageHandler}.
* @param directoryExpression the SpEL expression to evaluate target directory for writing files.
* @return the {@link FileWritingMessageHandlerSpec} instance.
*/
public static FileWritingMessageHandlerSpec outboundAdapter(String directoryExpression) {
So, that argument is parsed as a SpEL expression, and since you rely on a literal for directory name, therefore it has to be quoted.
That part of code you are talking about:
// first trigger file flushes
.subscribe(sf -> sf.transform("'/tmp/out/.*\\.txt'", e -> e.id("toTriggerPattern"))
.trigger("fileOut", e -> e.id("flusher")))
is for a MessageTriggerAction
implemented on the FileWritingMessageHandler
. Also see trigger()
and FileWritingMessageHandler
Javadocs an docs: https://docs.spring.io/spring-integration/reference/file/writing.html#file-flushing
Essentially we need that pattern from a transformer for the DefaultFlushPredicate
in the FileWritingMessageHandler
.