Can't figure out how to stop processing Flux on first match.
This what I have right now:
findAll(): Flux<Object>
findStorageId(Relation r): Mono<Long> | Mono.empty()
isPassing(Relation r): boolean
findAll().flatMap(p -> {
return Flux.fromStream(p.getRelations().stream()).flatMap(r -> {
return isPassing(r) ? findStorageId(r) : Mono.empty();
});
})
.handle((Long storageId, SynchronousSink<Long> sink) -> {
if (storageId != null) {
sink.next(storageId);
sink.complete();
}
})
.next()
.switchIfEmpty(Mono.error(new RuntimeException("Can't find storageId.")));
I'm trying to understand how I can interrupt processing of flux when first storageId is found. Right now I see, that first flatMap
continues to work after finding first match.
The problem is that flatmap is using using concurrency and prefetch is more than 1. In this case if you dont want to call the database many times but one by one you need to use concatmap with prefatch 1.
public static final String TO_BE_FOUND = "B";
@Override
public void run(String... args) throws Exception {
Mono<String> storageId =
Flux.just("A", "B", "C", "D", "A")
.doOnNext(id -> System.out.printf("processing: %s\n", id))
.concatMap(s -> findStorageId(s),1)
.next()
.switchIfEmpty(Mono.error(new RuntimeException("Can't find storageId.")));
storageId.subscribe();
}
private static Mono<String> findStorageId(String s) {
return TO_BE_FOUND.equals(s)
? Mono.just(s + UUID.randomUUID()).delayElement(Duration.ofSeconds(1))
: Mono.delay(Duration.ofSeconds(1)).flatMap(aLong -> Mono.empty());
}
in this case concatmap with prefetch 1 will request elements 1 by one and it will wait for the response.