Search code examples
javaspringreactive-programmingproject-reactorspring-reactor

How to Count Items in a Flux, return error if count is greater than X, else continue with Pipeline


I'm new to Project Reactor in Spring, and I'm not fully sure how to perform something:

I have my pipeline the pipeline returns records. All good.

But I would like to count those records and then do something (like if else), where if records returned are > X then error, otherwise just continue.

Knowing that Count returns a Mono<Long>, then I'll lose the records after that, what could I do?

I'm thinking:

Somehow use flatMap and perform something inside this flatmap. Somehow I see there is a reduce method in Flux that might help.

The point is, I'm not sure how to proceed.


Solution

  • Not entirely sure what you want so going to provide two suggestions based on assumptions

    1.. You want to collect all elements then assess if there are more than n, throw error if so. You can use collectList, count the elements and then convert back to flux if under whatever. This will only doStuff on any element if the total is under the limit.

        Flux.range(1,10)
                .collectList()
                .flatMap(s -> 
                    s.size()>7 
                        ? Mono.error(new RuntimeException("TOO MANY!")) 
                        : Mono.just(s))
                .flatMapMany(Flux::fromIterable)
                .map(this::doStuff)
    

    2.. You want to assess number of elements on the fly, which you can use an external Atomic counter for. This will doStuff on every element up to the problematic one.

        AtomicLong count = new AtomicLong();
    
        Flux.range(1,10)
                .flatMap(s -> 
                    count.incrementAndGet() > 7 
                        ? Flux.error(new RuntimeException("TOO MANY!")) 
                        : Flux.just(s))
                 .map(this::doStuff);