Search code examples
javaspringreactor

Project reactor: block() after collectList() doesn't work for Flux.create()


When I call block() on the Mono<> instance returned by collectList() in the sample below, my code hangs despite having called complete() on the emitter.

I recognize that calling block() isn't advised under most circumstances. I'm writing test code, and my use of it seemed reasonable -- aside from the fact that it doesn't work.

The code below is a slightly modified version of this SO post.

    ConnectableFlux<Integer> connect = Flux.<Integer>create(emitter -> {
        Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
          .forEach(t -> emitter.next(t));
        emitter.complete();
    }).publish(); // EDIT <- use .replay() 

    connect.subscribe(v -> System.out.println("1: " + v));

    Mono<List<Integer>> mono = connect
            .filter(number -> number > 6)
            .collectList();

    mono.subscribe(v -> System.out.println("4: " + v));

    connect.connect();
    List<Integer> results = mono.block(); //hangs here

EDIT: Using replay() instead of publish() as shown in the comment above allows the block() to resolve as I had initially expected.


Solution

  • When you call block(), you subscribe to your Mono and ConnectableFlux, but you don't connect to your flux source. Previous connect.connect() doesn't affect this subscription.

    Your last row could be like this:

    List<Integer> results = connect
            .autoConnect()
            .filter(number -> number > 6)
            .collectList()
            .block();