I have a Flux
stream. For each element processed I want to have an action triggered which is an asynchronous/non-blocking one. For example, a method returning back a Mono
from a db update.
I want this action to be done on the doOnNext
block.
I don't want to affect the Flux
, the processing and the back pressure implemented there.
Supposing Mono
method to be called is
Mono<Integer> dbUpdate();
should my Flux
be like this?
public Flux<Data> processData(PollRequest request)
{
return searchService.search(request)
.doOnNext(data -> dbUpdate(data));
}
Or should be as mentioned on a stack overflow example.
public Flux<Data> processData(PollRequest request)
{
return searchService.search(request)
.doOnNext(data -> dbUpdate(data).subscribe());
}
Won't the above cause blocking issues inside doOnNext
?
Also which is the most appropriate scheduler to use for this type of action?
dbUpdate()
will be ignored if you do not subscribe to it. The following snippet doesn't print anything because Mono.just("db update")
doesn't get subscribed.
Mono<String> dbUpdate() {
return Mono.just("db update")
.doOnNext(System.out::println);
}
public Flux<String> processData() {
return Flux.just("item 1", "item 2")
.doOnNext(data -> dbUpdate());
}
Note that .subscribe()
doesn't block your thread, it kicks off the work and returns immediately.