Search code examples
project-reactorresilience4j

Resilience4j rate limiter is not working properly in project reactor?


I'm currently researching the resilience4j library and for some reason the following code doesn't work as expected:

@Test
public void testRateLimiterProjectReactor()
{
    // The configuration below will allow 2 requests per second and a "timeout" of 2 seconds.
    RateLimiterConfig config = RateLimiterConfig.custom()
                                                .limitForPeriod(2)
                                                .limitRefreshPeriod(Duration.ofSeconds(1))
                                                .timeoutDuration(Duration.ofSeconds(2))
                                                .build();

    // Step 2.
    // Create a RateLimiter and use it.
    RateLimiterRegistry registry = RateLimiterRegistry.of(config);
    RateLimiter rateLimiter = registry.rateLimiter("myReactorServiceNameLimiter");

    // Step 3.
    Flux<Integer> flux = Flux.from(Flux.range(0, 10))
                                .transformDeferred(RateLimiterOperator.of(rateLimiter))
                                .log()

        ;

    StepVerifier.create(flux)
                .expectNextCount(10)
                .expectComplete()
                .verify()
    ;
}

According to the official examples here and here this should be limiting the request() to 2 elements per second. However, the logs show it's fetching all of the elements immediately:

15:08:24.587 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
15:08:24.619 [main] INFO reactor.Flux.Defer.1 - onSubscribe(RateLimiterSubscriber)
15:08:24.624 [main] INFO reactor.Flux.Defer.1 - request(unbounded)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(0)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(1)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(2)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(3)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(4)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(5)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(6)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(7)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(8)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(9)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onComplete()

I don't see what's wrong?


Solution

  • As already answered in comments above RateLimiter tracks the number of subscriptions, not elements. To achieve rate limiting on elements you can use limitRate (and buffer + delayElements). For example,

            Flux.range(1, 100)
                    .delayElements(Duration.ofMillis(100)) // to imitate a publisher that produces elements at a certain rate
                    .log()
                    .limitRate(10) // used to requests up to 10 elements from the publisher
                    .buffer(10) // groups integers by 10 elements
                    .delayElements(Duration.ofSeconds(2)) // emits a group of ints every 2 sec
                    .subscribe(System.out::println);