we are developing a Spring Integration Flow using Java DSL. This application reads from a remote File and inserts data in MongoDB. We are streaming the file lines and we need to bulk-insert data in MongoDB. From my understanding of the Spring Integration documentation and samples, there's no bulk option for this and I can't figure out how to implement the expected behaviour. We tried using Aggregation
but we didn't find a suitable solution for fixed batch size.
Sample of the involved beans
@Configuration
public class SampleConfiguration {
...
@Bean
MessagingTemplate messagingTemplate(ApplicationContext context) {
MessagingTemplate messagingTemplate = new MessagingTemplate();
messagingTemplate.setBeanFactory(context);
return messagingTemplate;
}
@Bean
IntegrationFlow sftpSource() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
factory.setHost("localhost");
factory.setPort(22);
factory.setUser("foo");
factory.setPassword("foo");
factory.setAllowUnknownKeys(true);
SftpRemoteFileTemplate template = new SftpRemoteFileTemplate(factory);
return IntegrationFlow
.from(Sftp.inboundStreamingAdapter(template, Comparator.comparing(DirEntry::getFilename))
.remoteDirectory("upload")
.patternFilter("*.csv")
.maxFetchSize(1),
spec -> spec.poller(Pollers.fixedRate(Duration.ofMillis(1000)))
.autoStartup(true))
.split(Files
.splitter()
.markers()
.charset(StandardCharsets.UTF_8)
.firstLineAsHeader("fileHeader")
.applySequence(true))
.filter(payload -> !(payload instanceof FileSplitter.FileMarker))
.enrichHeaders(h -> h.errorChannel("errorChannel"))
.handle((String payload, MessageHeaders headers) -> {
String header = headers.get("fileHeader", String.class);
String rowWithHeader = header + "\n" + payload;
try (StringReader reader = new StringReader(rowWithHeader)) {
CsvToBean<MyPojo> beanReader = new CsvToBeanBuilder<MyPojo>(reader)
.withType(MyPojo.class)
.withSeparator(';')
.build();
return beanReader.iterator().next();
}
})
.handle(MongoDb
.outboundGateway(mongoTemplate)
.entityClass(MyPojo.class)
.collectionNameFunction(m -> "mypojo")
.collectionCallback(
(collection, message) -> {
MyPojo myPojo = (MyPojo) message.getPayload();
Document document = new Document();
mongoTemplate.getConverter().write(myPojo, document);
return collection.insertOne(document);
}))
.channel("logChannel")
.get();
}
@Bean
IntegrationFlow logFiles() {
return IntegrationFlow
.from("logChannel")
.handle(message -> log.info("message received: {}", message))
.get();
}
@Bean
IntegrationFlow logErrors() {
return IntegrationFlow
.from("errorChannel")
.handle(message -> {
MessagingException exception = (MessagingException) message.getPayload();
log.error("error message received: {} for message {}", exception.getMessage(),
exception.getFailedMessage());
})
.get();
}
...
}
Update - aggregation step
.aggregate(aggregatorSpec -> aggregatorSpec
.correlationStrategy(message -> message
.getHeaders()
.get(FileHeaders.REMOTE_FILE))
.releaseStrategy(group -> group.size() >= 5)
.groupTimeout(200L)
.sendPartialResultOnExpiry(true))
.handle(MongoDb...)
I think you need to look into a FileAggregator
which is able to collect lines into a list according to the FileMarkerReleaseStrategy
: https://docs.spring.io/spring-integration/docs/current/reference/html/file.html#file-aggregator.
Then yes, you can cast a payload into a List
in your collectionCallback()
and perform batch insert into a MongoDB.
UPDATE
To be able to chunk from the aggregator using the same correlation key (file name), we need to remove already released group from the store. See AggregatorSpec.expireGroupsUponCompletion()
option and respective docs: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#releasestrategy