Search code examples
spring-integrationspring-integration-dsl

How to handle bulk operations in Spring Integration


we are developing a Spring Integration Flow using Java DSL. This application reads from a remote File and inserts data in MongoDB. We are streaming the file lines and we need to bulk-insert data in MongoDB. From my understanding of the Spring Integration documentation and samples, there's no bulk option for this and I can't figure out how to implement the expected behaviour. We tried using Aggregation but we didn't find a suitable solution for fixed batch size.

Sample of the involved beans

@Configuration
public class SampleConfiguration {

  ...

  @Bean
  MessagingTemplate messagingTemplate(ApplicationContext context) {
    MessagingTemplate messagingTemplate = new MessagingTemplate();
    messagingTemplate.setBeanFactory(context);
    return messagingTemplate;
  }

  @Bean
  IntegrationFlow sftpSource() {
    DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
    factory.setHost("localhost");
    factory.setPort(22);
    factory.setUser("foo");
    factory.setPassword("foo");
    factory.setAllowUnknownKeys(true);
    SftpRemoteFileTemplate template = new SftpRemoteFileTemplate(factory);
    return IntegrationFlow
        .from(Sftp.inboundStreamingAdapter(template, Comparator.comparing(DirEntry::getFilename))
                .remoteDirectory("upload")
                .patternFilter("*.csv")
                .maxFetchSize(1),
            spec -> spec.poller(Pollers.fixedRate(Duration.ofMillis(1000)))
                .autoStartup(true))
        .split(Files
            .splitter()
            .markers()
            .charset(StandardCharsets.UTF_8)
            .firstLineAsHeader("fileHeader")
            .applySequence(true))
        .filter(payload -> !(payload instanceof FileSplitter.FileMarker))
        .enrichHeaders(h -> h.errorChannel("errorChannel"))
        .handle((String payload, MessageHeaders headers) -> {
          String header = headers.get("fileHeader", String.class);
          String rowWithHeader = header + "\n" + payload;
          try (StringReader reader = new StringReader(rowWithHeader)) {
            CsvToBean<MyPojo> beanReader = new CsvToBeanBuilder<MyPojo>(reader)
                .withType(MyPojo.class)
                .withSeparator(';')
                .build();
            return beanReader.iterator().next();
          }
        })
        .handle(MongoDb
            .outboundGateway(mongoTemplate)
            .entityClass(MyPojo.class)
            .collectionNameFunction(m -> "mypojo")
            .collectionCallback(
                (collection, message) -> {
                  MyPojo myPojo = (MyPojo) message.getPayload();
                  Document document = new Document();
                  mongoTemplate.getConverter().write(myPojo, document);
                  return collection.insertOne(document);
                }))
        .channel("logChannel")
        .get();
  }

  @Bean
  IntegrationFlow logFiles() {
    return IntegrationFlow
        .from("logChannel")
        .handle(message -> log.info("message received: {}", message))
        .get();
  }

  @Bean
  IntegrationFlow logErrors() {
    return IntegrationFlow
        .from("errorChannel")
        .handle(message -> {
          MessagingException exception = (MessagingException) message.getPayload();
          log.error("error message received: {} for message {}", exception.getMessage(),
              exception.getFailedMessage());
        })
        .get();
  }
  ...
}

Update - aggregation step

.aggregate(aggregatorSpec -> aggregatorSpec
            .correlationStrategy(message -> message
                .getHeaders()
                .get(FileHeaders.REMOTE_FILE))
            .releaseStrategy(group -> group.size() >= 5)
            .groupTimeout(200L)
            .sendPartialResultOnExpiry(true))
.handle(MongoDb...)

Solution

  • I think you need to look into a FileAggregator which is able to collect lines into a list according to the FileMarkerReleaseStrategy: https://docs.spring.io/spring-integration/docs/current/reference/html/file.html#file-aggregator.

    Then yes, you can cast a payload into a List in your collectionCallback() and perform batch insert into a MongoDB.

    UPDATE

    To be able to chunk from the aggregator using the same correlation key (file name), we need to remove already released group from the store. See AggregatorSpec.expireGroupsUponCompletion() option and respective docs: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#releasestrategy