When I call block() on the Mono<> instance returned by collectList() in the sample below, my code hangs despite having called complete() on the emitter.
I recognize that calling block() isn't advised under most circumstances. I'm writing test code, and my use of it seemed reasonable -- aside from the fact that it doesn't work.
The code below is a slightly modified version of this SO post.
ConnectableFlux<Integer> connect = Flux.<Integer>create(emitter -> {
Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.forEach(t -> emitter.next(t));
emitter.complete();
}).publish(); // EDIT <- use .replay()
connect.subscribe(v -> System.out.println("1: " + v));
Mono<List<Integer>> mono = connect
.filter(number -> number > 6)
.collectList();
mono.subscribe(v -> System.out.println("4: " + v));
connect.connect();
List<Integer> results = mono.block(); //hangs here
EDIT: Using replay() instead of publish() as shown in the comment above allows the block() to resolve as I had initially expected.
When you call block()
, you subscribe to your Mono
and ConnectableFlux
, but you don't connect to your flux source. Previous connect.connect()
doesn't affect this subscription.
Your last row could be like this:
List<Integer> results = connect
.autoConnect()
.filter(number -> number > 6)
.collectList()
.block();