Search code examples
javaspring-webfluxproject-reactorreactor

Reactor Sink that emits only 1 event at a time?


I am playing with Replaying Reactor Sinks, I am trying to achieve a mix of a unicast and a replay processor. I would like it to emit to only one subscriber at the same (UnicastProcessor), but that it can also emit a default value on subscribe (ReplayProcessor). Here is something similar to the real case:

Flux<Boolean> monoC = Sinks.many().replay().latestOrDefault(true).asFlux().doOnNext(integer -> System.out.println(new Date() + " - " + Thread.currentThread().getName() + "    emiting next"));
for(int i = 0; i < 5; i++) {
    new Thread(() -> {
        monoC.flatMap(unused ->
                webClientBuilder.build()
                        .get()
                        .uri("https://www.google.com")
                        .retrieve()
                        .toEntityFlux(String.class)
                        .doOnSuccess(stringResponseEntity -> {
                            System.out.println(new Date() + " - " + Thread.currentThread().getName() + "    finished processing");
                        })
        ).subscribe();
    }).start();
}

That is printing:

emiting next
...
emiting next
finished processing
...
finished processing

Instead, I would like it to print:

emiting next
finished processing
...
emiting next
finished processing

Update, some more clarifications on the real case scenario:

The real case scenario is: I have a Spring WebFlux application that acts like a relay, it receives a request on a specific endpoint A, and it relays it to another microservice B. This microservice can then reply with a 429 if I go too fast, and in a header with how long I have to wait before retrying again. The retrying thing I have already achieved it with a .retry operator and a Mono.delay, but in the meantime, I can receive another request on my first endpoint A which will have to be blocked until the Mono.delay finishes.

I am trying to achieve this with a Replay Sink, so that after receiving a 429, I emit a "false" to the sink and after Mono.delay is over, it emits a true to the sink, so if in the mean time I receive any further request on A it can filter out all the falses and wait for a true to be emitted.

The problem i have on top of that is that, when I receive too many request to relay on A, microservice B starts responding slow, and getting overloaded. Therefore, i would like to limit the rate that the Sink is emitting. To be precise, i would like the publisher to emit a value, but don't emit any more until the subscriber hits onCompleted.


Solution

  • As soon as I understood your issue correctly, you want the requests to B being processed sequentially. In that case you should have a look at https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#flatMap-java.util.function.Function-int-

    public final <V> Flux<V> flatMap(Function<? super T, ? extends Publisher<? extends V>> mapper, int concurrency)
    

    I think your case should look like

    //sinks should be global variable for your controller, initialized in @PostConstruct
            var sinks = Sinks
                    //unsafe is required for multithreading
                    .unsafe()
                    .many()
                    .replay()
                    .latest();
            sinks.asFlux()
                    .doOnNext(it -> System.out.printf("%s is emitting %s\n", Thread.currentThread().getName(), it))
                    .flatMap(counter -> {
                        return webClientBuilder.build()
                                .get()
                                .uri("https://www.google.com")
                                .retrieve()
                                .toEntityFlux(String.class)
                                .doOnSuccess(stringResponseEntity -> {
                                    System.out.println(counter + " " + new Date() + " - " + Thread.currentThread().getName() + "    finished processing with " + stringResponseEntity.getStatusCode());
                                })
                                .then(Mono.just(counter));
                        //concurrency = 1 causes the flatMap being handled only once in parallel
                    }, 1)
                    .doOnError(Throwable::printStackTrace)
                    //this subscription also must be done in @PostConstruct
                    .subscribe(counter -> System.out.printf("%s completed in %s\n", counter, Thread.currentThread().getName()));
    
            //and this is your endpoint method
            for (int i = 0; i < 5; i++) {
                int counter = i;
                new Thread(() -> {
                    var result = sinks.tryEmitNext(counter);
                    if (result.isFailure()) {
                        //mb in that case you should retry
                        System.out.printf("%s emitted %s. with fail: %s\n", Thread.currentThread().getName(), counter, result);
                    } else {
                        System.out.printf("%s successfully emitted %s\n", Thread.currentThread().getName(), counter);
                    }
                }).start();
            }