I would like to have a continuously running job with Flux which calls a service and processes its response. The processing could be slower than the frequency of the service call. I have a sample code to do it, but it doesn't do what I would like to achieve.
Flux.interval(Duration.ZERO, Duration.ofSeconds(1), Schedulers.boundedElastic())
.onBackpressureDrop()
.doOnNext(counter -> something())
.onErrorContinue(...)
.doOnComplete(...)
.subscribe();
So the problem is if the tick occurs in every second, but the something() needs 5 sec to complete then something() is called in every 5 sec not in every 1 sec. How to modify the code that every something() gets an own thread (from a bounded thread pool)? I have tested that every subscriber gets a dedicated thread, so I could increase the processing with multiple (fixed) subscribers, but I would like to make it more dynamical.
Thanks!
How about
Flux.interval(Duration.ZERO, Duration.ofSeconds(1), Schedulers.boundedElastic())
.onBackpressureDrop()
.flatMap(counter -> Mono.fromCallable(() -> {
something();
return counter; // simply to justify the callable, in case something has return type void.
}).subscribeOn(Schedulers.parallel()))
.onErrorContinue(...)
.doOnComplete(...)
.subscribe();
The subscribeOn(Schedulers.parallel())
will execute the Mono on an unbounded thread pool. It will also allow you to further continue the stream with more operations.