Search code examples
javarx-javarx-java2micronautresilience4j

RxJava equivalent of simple ThreadPoolExecutor example


I've been out of the Java game for ~8 years and a lot has changed since then. The biggest challenge for me has been RxJava / reactive. I'm looking for rough guidance on how to do the equivalent below in a fully reactive way.

The basic requirement, implemented below using ThreadPoolExecutor, is to processing a large amount of Stuff by calling a remote web service, which has a documented rate limit of 100 requests/minute. My goal is to process as much as possible as fast as possible, without dropping any Stuff but still honoring the downstream rate limit. This code has been simplified to avoid errors, bulkheads, circuit breakers, retry logic, etc.

This code currently works fine but it results in what feels like a lot of wasted threads given all the non-blocking reactive options. Even the HTTP client I'm using to call my service offers back a Flowable, which I'm simply blocking on in each of the executor's 20 threads.

I'd love to understand what the reactive equivalent should be. Where I've struggled is almost all the docs I find showcase using static sources for the Observable (ex: Observable.fromArray(1,2,3,4,5)). I know the solution likely involves IoScheduler and maybe groupBy, but I have yet to figure out how to merge the Flowables coming from my HTTP client into some complete chain that does parallelization (up to a limit, such as 20) and rate limiting.

public class Example {
    private static final int THREADS = 20;

    // using https://docs.micronaut.io/latest/guide/index.html#httpClient
    @Client("http://stuff-processor.internal:8080")
    @Inject
    RxHttpClient httpClient;

    private ThreadPoolExecutor executor;
    private final RateLimiter rateLimiter;

    public Example() {
        // up to 20 threads to process the unbounded queue
        // incoming Stuff is very bursty...
        // ...we could go hours without anything and then hundreds could come in
        this.executor = new ThreadPoolExecutor(THREADS, THREADS,
                30,TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        this.executor.allowCoreThreadTimeOut(true);

        // using https://resilience4j.readme.io/docs/ratelimiter
        RateLimiterConfig config = RateLimiterConfig.custom()
                .limitRefreshPeriod(Duration.ofSeconds(60))
                .limitForPeriod(100)
                .timeoutDuration(Duration.ofSeconds(90))
                .build();
        RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.of(config);
        rateLimiter = rateLimiterRegistry.rateLimiter("stuff-processor", config);
    }

    /**
     * Called when the user takes an action that can cause 1 or 1000s of new
     * Stuff to be entered into the system. Each instance of Stuff results in
     * a separate call to this method. Ex: 100 Stuffs = 100 calls.
     */
    void onNewStuff(Stuff stuff) {
        final Runnable task = () -> {
            final Flowable<HttpResponse<Boolean>> flowable = httpClient.exchange(
                    HttpRequest.POST("/process", stuff),
                    Boolean.class);

            final HttpResponse<Boolean> response = flowable.blockingFirst();
            if (response.body()) {
                System.out.println("Success!");
            } else {
                System.out.println("Fail :(");
            }
        };

        final Runnable rateLimitedTask = 
                RateLimiter.decorateRunnable(rateLimiter, task);
        executor.submit(rateLimitedTask);
    }
}

Thank you!


Solution

  • First, to build this in a completely non-blocking manner, you need to use a non-blocking, asynchronous HTTP client library like Netty. I am not sure about how RxHttpClient works.

    Say you have a list stuffs. This is how I would do it:

    Observable.fromIterable(stuffs).flatMap(a -> client.nonBlockingPost(a).subscribeOn(Schedulers.io())).subscribe();
    

    flatMap merges the responses as they come.

    To limit the rate, you flatMap has a second parameter, which caps the number of inner streams it subscribes to in parallel. Say you want to make no more than 10 calls at once. Do this:

    Observable.fromIterable(stuffs).flatMap(a -> client.nonBlockingPost(a).subscribeOn(Schedulers.io()), 10).subscribe();