Search code examples
javaasynchronousspring-webfluxproject-reactorflux

Flux interval ticks on new threads


I would like to have a continuously running job with Flux which calls a service and processes its response. The processing could be slower than the frequency of the service call. I have a sample code to do it, but it doesn't do what I would like to achieve.

Flux.interval(Duration.ZERO, Duration.ofSeconds(1), Schedulers.boundedElastic())
    .onBackpressureDrop()
    .doOnNext(counter -> something())
    .onErrorContinue(...)
    .doOnComplete(...)
    .subscribe();

So the problem is if the tick occurs in every second, but the something() needs 5 sec to complete then something() is called in every 5 sec not in every 1 sec. How to modify the code that every something() gets an own thread (from a bounded thread pool)? I have tested that every subscriber gets a dedicated thread, so I could increase the processing with multiple (fixed) subscribers, but I would like to make it more dynamical.

Thanks!


Solution

  • How about

            Flux.interval(Duration.ZERO, Duration.ofSeconds(1), Schedulers.boundedElastic())
                    .onBackpressureDrop()
                    .flatMap(counter -> Mono.fromCallable(() -> {
                        something();
                        return counter; // simply to justify the callable, in case something has return type void.
                    }).subscribeOn(Schedulers.parallel()))
                    .onErrorContinue(...)
                    .doOnComplete(...)
                    .subscribe();
    

    The subscribeOn(Schedulers.parallel()) will execute the Mono on an unbounded thread pool. It will also allow you to further continue the stream with more operations.