I have a REST service (let's call it 'MAIN SERVICE') that calls other remote services. For performance purpose I need MAIN SERVICE to instant answer when one of the remote calls throw an error ; but I need to keep listening to other calls response to log them or compute verifications.
Basically I have something like, a 'MAIN SERVICE':
public ResponseEntity<SomeType> mainService() {
Mono<A> remoteCallA = getRemoteCallA();
Mono<B> remoteCallB = getRemoteCallB();
SomeType result = Mono.zip(remoteCallA , remoteCallB)
.doOnSuccess(...)
.map(...)
.block();
return ResponseEntity.ok(response);
}
An other service that calls remote A and do some things with result :
//TAKES 1 SECONDE TO RESPOND
public Mono<A> getRemoteCallA() {
return client.get()
...
.doOnSuccess(response ->
//MANDATORY EXECUTION);
}
An other service that calls remote B that throw an error :
//ERROR TRHOWED IN 500 MS
public Mono<B> getRemoteCallB() {
return client.get()
...
.doOnError(exception ->
//SOME LOGGGING
);
}
My problem is that call B fails before call A and I want to instant return an HTTP response. But then, when call A completes, doOnSuccess is not triggered (any doOnXXX method aswell).
I am not sure but I think the zip stream is canceled (saw it with log()) so A events are not triggered ?
zipDelayError is not a solution otherwise I have to wait 1 seconde before responding the error
What seems to be working:
Publisher
so that they are hot sourceszip
so that you can return one to client and handle errors in your local codeonErrorXXX
@Test
void so_78147735() throws InterruptedException {
var monoA = Mono.fromSupplier(() -> delay(1000L, false, "A")).subscribeOn(Schedulers.boundedElastic());
var monoB = Mono.fromSupplier(() -> delay(500L, true, "B")).subscribeOn(Schedulers.boundedElastic());
var cachedA = monoA.cache();
var cachedB = monoB.cache();
var response = Mono.zip(cachedA, cachedB).subscribeOn(Schedulers.boundedElastic());
var local = Mono.zip(cachedA.map(resultOfA -> resultOfA.replace("A", "SUCCESS")),
cachedB.onErrorReturn("FAIL"))
.subscribeOn(Schedulers.boundedElastic());
response.subscribe(objects -> System.out.println("response mono: " + objects.getT1() + " " + objects.getT2()));
local.subscribe(objects -> System.out.println("local mono: " + objects.getT1() + " " + objects.getT2()));
Thread.sleep(1000L);
}
private String delay(Long delay, Boolean error, String id) {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (error) {
throw new IllegalArgumentException(id);
}
System.out.println("delay method: " + id);
return id;
}