Search code examples
javarxjsspring-webfluxreactive-programmingproject-reactor

Conditionally cancel() Flux and preserve value causing cancel


I have following Reactive Flux stream in Java,

Flux.just(e1, e2, e3, e4, e5)
   .flatMapSequentially(/* process elements */)
   .scan(new Accumulator(null, 0), (element, total) -> new Accumulator(element, element.children.size + total))
   .takeWhile(/* conditionally process till for example e3, need accumulator value here */)
   .map(/* post processing */)

record Accumulator(Element element, int total) {}

in takeWhile the cancel happens conditionally at element e3, and once cancelled, the value will not be available in subsequent .map(), and stream returns with e1, e2 only, But I need e3 element also along with e1, e2. How can I preserve e3 conditionally?


Solution

  • You can use takeUntil instead:

    This includes the matching data (unlike takeWhile(java.util.function.Predicate<? super T>)).

    Example:

    import org.junit.jupiter.api.Test;
    import reactor.core.publisher.Flux;
    import reactor.test.StepVerifier;
    
    public class TakeUntilTest {
    
        @Test
        public void takeUntilIncludesStopElement() {
            var datasStream = Flux.range(1, 10)
                    // Stop when we encounter value 3
                    .takeUntil(i -> i.equals(3));
            StepVerifier.create(datasStream)
                    // verify resulting flow include stop value
                    .expectNext(1, 2, 3)
                    .verifyComplete();
        }
    }