Search code examples
springkotlinmonofluxproject-reactor

Proper way to create a Flux from a list of Mono's


Lets say I have a API operation that consumes a List of CustomObjects. For every one of those objects it calls a service method that creates a Mono. How do I create a Flux from those Mono objects in an idiomatic and therefore non-blocking way?

What I've come up with for now is this. I changed the method names to better reflect their intended purpose.

fun myApiMethod(@RequestBody customObjs: List<CustomObject>): Flux<CustomObject> {

    return Flux.create { sink ->
        customObjs.forEach {

            service.persistAndReturnMonoOfCustomObject(it).map {
                sink.next(it)
            }
        }
        sink.complete()
    }
}

Moreover do I need to subscribe to the flux to actually make it return something?


Solution

  • I believe you can use concat() instead:

    /**
     * Concatenate all sources provided as a vararg, forwarding elements emitted by the
     * sources downstream.
     * <p>
     * Concatenation is achieved by sequentially subscribing to the first source then
     * waiting for it to complete before subscribing to the next, and so on until the
     * last source completes. Any error interrupts the sequence immediately and is
     * forwarded downstream.
     * <p>
     * <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/concat.png" alt="">
     * <p>
     * @param sources The {@link Publisher} of {@link Publisher} to concat
     * @param <T> The type of values in both source and output sequences
     *
     * @return a new {@link Flux} concatenating all source sequences
     */
    @SafeVarargs
    public static <T> Flux<T> concat(Publisher<? extends T>... sources) {
    

    Or merge():

    /**
     * Merge data from {@link Publisher} sequences contained in an array / vararg
     * into an interleaved merged sequence. Unlike {@link #concat(Publisher) concat},
     * sources are subscribed to eagerly.
     * <p>
     * <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/merge.png" alt="">
     * <p>
     * Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with
     * an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source
     * in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to
     * another source.
     *
     * @param sources the array of {@link Publisher} sources to merge
     * @param <I> The source type of the data sequence
     *
     * @return a merged {@link Flux}
     */
    @SafeVarargs
    public static <I> Flux<I> merge(Publisher<? extends I>... sources) {