I am playing with Replaying Reactor Sinks, I am trying to achieve a mix of a unicast and a replay processor. I would like it to emit to only one subscriber at the same (UnicastProcessor), but that it can also emit a default value on subscribe (ReplayProcessor). Here is something similar to the real case:
Flux<Boolean> monoC = Sinks.many().replay().latestOrDefault(true).asFlux().doOnNext(integer -> System.out.println(new Date() + " - " + Thread.currentThread().getName() + " emiting next"));
for(int i = 0; i < 5; i++) {
new Thread(() -> {
monoC.flatMap(unused ->
webClientBuilder.build()
.get()
.uri("https://www.google.com")
.retrieve()
.toEntityFlux(String.class)
.doOnSuccess(stringResponseEntity -> {
System.out.println(new Date() + " - " + Thread.currentThread().getName() + " finished processing");
})
).subscribe();
}).start();
}
That is printing:
emiting next
...
emiting next
finished processing
...
finished processing
Instead, I would like it to print:
emiting next
finished processing
...
emiting next
finished processing
Update, some more clarifications on the real case scenario:
The real case scenario is: I have a Spring WebFlux application that acts like a relay, it receives a request on a specific endpoint A, and it relays it to another microservice B. This microservice can then reply with a 429 if I go too fast, and in a header with how long I have to wait before retrying again. The retrying thing I have already achieved it with a .retry operator and a Mono.delay, but in the meantime, I can receive another request on my first endpoint A which will have to be blocked until the Mono.delay finishes.
I am trying to achieve this with a Replay Sink, so that after receiving a 429, I emit a "false" to the sink and after Mono.delay is over, it emits a true to the sink, so if in the mean time I receive any further request on A it can filter out all the falses and wait for a true to be emitted.
The problem i have on top of that is that, when I receive too many request to relay on A, microservice B starts responding slow, and getting overloaded. Therefore, i would like to limit the rate that the Sink is emitting. To be precise, i would like the publisher to emit a value, but don't emit any more until the subscriber hits onCompleted.
As soon as I understood your issue correctly, you want the requests to B being processed sequentially. In that case you should have a look at https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#flatMap-java.util.function.Function-int-
public final <V> Flux<V> flatMap(Function<? super T, ? extends Publisher<? extends V>> mapper, int concurrency)
I think your case should look like
//sinks should be global variable for your controller, initialized in @PostConstruct
var sinks = Sinks
//unsafe is required for multithreading
.unsafe()
.many()
.replay()
.latest();
sinks.asFlux()
.doOnNext(it -> System.out.printf("%s is emitting %s\n", Thread.currentThread().getName(), it))
.flatMap(counter -> {
return webClientBuilder.build()
.get()
.uri("https://www.google.com")
.retrieve()
.toEntityFlux(String.class)
.doOnSuccess(stringResponseEntity -> {
System.out.println(counter + " " + new Date() + " - " + Thread.currentThread().getName() + " finished processing with " + stringResponseEntity.getStatusCode());
})
.then(Mono.just(counter));
//concurrency = 1 causes the flatMap being handled only once in parallel
}, 1)
.doOnError(Throwable::printStackTrace)
//this subscription also must be done in @PostConstruct
.subscribe(counter -> System.out.printf("%s completed in %s\n", counter, Thread.currentThread().getName()));
//and this is your endpoint method
for (int i = 0; i < 5; i++) {
int counter = i;
new Thread(() -> {
var result = sinks.tryEmitNext(counter);
if (result.isFailure()) {
//mb in that case you should retry
System.out.printf("%s emitted %s. with fail: %s\n", Thread.currentThread().getName(), counter, result);
} else {
System.out.printf("%s successfully emitted %s\n", Thread.currentThread().getName(), counter);
}
}).start();
}