Please help me to find right way to save flux of entities, resceived from Cassandra, to Elasticserch.
For exaple I have huge flux of entyties:
Flux<User> user = cassandraService.getUsers();
How can I save all flux of entyties to Elasticsearch using reactive way? Which method for calling Elasticksearch is better to use:
And how to block until last entity in flux will be processed?
First, short explanation of flatMap()
vs map()
:
flatMap()
should be used for non-blocking operations (asynchronous) - in short, anything which returns back Mono, Flux,map()
should be used to transform an object by applying a synchronous function to each item (performed in fixed time).So if you want to save a document in Elasticsearch you should use flatMap()
- this is an async operation and its execution time is indeterministic.
When it comes to doOnNext()
, it is used to add side-effect behavior, triggered when the Flux emits an item, e.g. for logging or some other action that should happen after saving has finished.
Example:
Mono.just(productDto)
.map(dto -> new Product(dto.getName(), dto.isAvailable()))
.flatMap(elasticsearchOperations::save)
.doOnNext(savedProduct -> logger.info("Saved {}", savedProduct.getProductName()))
.subscribe();
And you do not have to block on save()
, this is the responsibility of flatMap()
.