Search code examples
elasticsearchreactive-programmingspring-data-elasticsearch

Save flux of records usung spring data elasticsearch in reactive manner


Please help me to find right way to save flux of entities, resceived from Cassandra, to Elasticserch.

For exaple I have huge flux of entyties:

Flux<User> user = cassandraService.getUsers();

How can I save all flux of entyties to Elasticsearch using reactive way? Which method for calling Elasticksearch is better to use:

  • user.map(u-> reactiveElasticsearchOperations.save(u, INDEX_NAME));
  • user.flatMap(u-> reactiveElasticsearchOperations.save(u, INDEX_NAME));
  • user.onNext(u-> reactiveElasticsearchOperations.save(u, INDEX_NAME));
  • ...something other.. ??

And how to block until last entity in flux will be processed?


Solution

  • First, short explanation of flatMap() vs map():

    • flatMap() should be used for non-blocking operations (asynchronous) - in short, anything which returns back Mono, Flux,
    • map() should be used to transform an object by applying a synchronous function to each item (performed in fixed time).

    So if you want to save a document in Elasticsearch you should use flatMap() - this is an async operation and its execution time is indeterministic.

    When it comes to doOnNext(), it is used to add side-effect behavior, triggered when the Flux emits an item, e.g. for logging or some other action that should happen after saving has finished.

    Example:

    Mono.just(productDto)
        .map(dto -> new Product(dto.getName(), dto.isAvailable()))
        .flatMap(elasticsearchOperations::save)
        .doOnNext(savedProduct -> logger.info("Saved {}", savedProduct.getProductName()))
        .subscribe();
    

    And you do not have to block on save(), this is the responsibility of flatMap().