Search code examples

Flux interval ticks on new threads

I would like to have a continuously running job with Flux which calls a service and processes its response. The processing could be slower than the frequency of the service call. I have a sample code to do it, but it doesn't do what I would like to achieve.

Flux.interval(Duration.ZERO, Duration.ofSeconds(1), Schedulers.boundedElastic())
    .doOnNext(counter -> something())

So the problem is if the tick occurs in every second, but the something() needs 5 sec to complete then something() is called in every 5 sec not in every 1 sec. How to modify the code that every something() gets an own thread (from a bounded thread pool)? I have tested that every subscriber gets a dedicated thread, so I could increase the processing with multiple (fixed) subscribers, but I would like to make it more dynamical.



  • How about

            Flux.interval(Duration.ZERO, Duration.ofSeconds(1), Schedulers.boundedElastic())
                    .flatMap(counter -> Mono.fromCallable(() -> {
                        return counter; // simply to justify the callable, in case something has return type void.

    The subscribeOn(Schedulers.parallel()) will execute the Mono on an unbounded thread pool. It will also allow you to further continue the stream with more operations.