I have the below async tasks:
public class AsyncValidationTask {
// Returns Mono.error(new Exception()) if error, otherwise Mono.empty()
public Mono<Void> execute(Object o);
}
public class AsyncSaveTask {
// Returns Mono.error(new Exception()) if error, otherwise Mono of Object
public Mono<Object> execute(Object o);
}
And below Service class:
public class AsyncService {
private AsyncValidationTask validation;
private AsyncSaveTask save;
public Mono<Object> validateAndSave(Object o) {
return Mono.defer(() -> this.validation.execute(o))
// Right now, the problem is that when validation completes successfully, it
// emits Mono.empty hence the flatMap chained below will not be invoked at all.
.flatMap(dontcare -> this.save.execute(o))
}
}
As shown above, I tried to use flatMap
to chain the AsyncSaveTask.execute
call if the AsyncValidationTask.execute
completes successfully, it won't work because nothing is emitted (Mono.empty) upon completion.
I also consider then
to chain the second call, but it will always invoke the chained call regardless of the Mono.error produced by the first validation call.
How can I chain them properly?
.then
for terminal only sources to chainUse .then
, in order to chain your execution with the process, which only sends a terminal signal.
Also, pay attention, if you need to do something on the error signal, then you have to accompany your .then
with onErrorResume
beforehand.
public class AsyncService {
private AsyncValidationTask validation;
private AsyncSaveTask save;
public Mono<Object> validateAndSave(Object o) {
return Mono.defer(() -> this.validation.execute(o))
.onErrorResume(t -> ...) // should be before then
.then(this.save.execute(o))
}
}
.defer
in order to postpone mono creationIn order to execute the this.save.execute(o)
only in case of successful validation, you have to wrap it in Mono.defer
as well:
public class AsyncService {
private AsyncValidationTask validation;
private AsyncSaveTask save;
public Mono<Object> validateAndSave(Object o) {
return Mono.defer(() -> this.validation.execute(o))
.onErrorResume(t -> ...) // should be before then
.then(Mono.defer(() -> this.save.execute(o)))
}
}
Usually, it is not necessary, because Mono
is a LAZY type which SHOULD start doing work only in case subscription happened (subscription == .subscribe()
).
The implementation of Mono#then
guarantees that subscription to Mono
returned by the this.save.execute
the method starts RIGHT AFTER the Mono.defer(() -> this.validation.execute(o))
completed.
The only reason why execution may start earlier may be a PURPOSE (e.g., business logic which provides such behavior on purpose - caching; hot source, etc.) OR an INCORRECT implementation of the this.save.execute(o)
which starts doing work regardless of actual subscription.
In general, it is a good practice to ensure that API which does work and expose that work as a Publisher
(e.g. Mono
| Flux
) is Lazy.
It means that the API creator MUST ensure that the work execution happens only in case the user has subscribed to the given Publisher
instance.
For example, if your async API does CompletableFuture
creation underneath, it worth to manually wrap your CompletableFuture
creation into Mono.defer
or to use proper method extension, e.g Mono.fromFuture(Supplier<? extends CompletableFuture<? extends T>> futureSupplier)
Let's consider how to make a regular ThreadPool task submission Reactive.
interface Executor {
Future<T> execute(Callable<T> runnable);
}
So, in order to make Executor
reactive, we have to create something like the following:
interface ReactiveExecutor {
Mono<T> execute(Callable<T> runnable);
}
The following is a possible implementation of such an adapter which works:
class ReactiveExecutorAdapter {
final Executor delegate;
...
Mono<T> execute(Callable<T> runnable) {
MonoProcessor<T> result = MonoProcessor.create();
Future<T> task = delegate.execute(() -> {
T value = runnable.call();
result.onNext(value);
result.onComplet();
return value;
});
return result.doOnCancel(() -> task.cancel());
}
}
Definitely, such an implementation will be working. However, it has a few critical issues:
Publisher
)Mono
, which supports later subscription. .subscribe
method happened (then we got value leak which impossible to handle)Mono execute(..)
with Mono.defer
outside of the implementation (see the original problem in the question). Subsequently, it leads to the fact that an API user can easily 'shoot your self in the foot' forgetting to wrap execution with an extra .defer
Basically, it is enough to move the Mono.defer
into the library internals. It will make the life of the API users much easier since they don't have to think when it is necessary to use deferring (hence - less possible issues).
For example, the simplest solution for our Reactive Executor can be the following:
class ReactiveExecutorAdapter {
final Executor delegate;
...
Mono<T> execute(Callable<T> runnable) {
Mono.defer(() -> {
MonoProcessor<T> result = MonoProcessor.create();
Future<T> task = delegate.execute(() -> {
T value = runnable.call();
result.onNext(value);
result.onComplet();
return value;
});
return result.doOnCancel(() -> task.cancel());
})
}
}
By just deferring the execution, we can solve at least one problem for sure - ensure value is not leaked anymore.
However, in order to solve all the possible problems in that particular case, we may use Mono.create
which is properly designed for adopting async API:
class ReactiveExecutorAdapter {
final Executor delegate;
...
Mono<T> execute(Callable<T> runnable) {
Mono.crete(monoSink -> {
Future<T> task = delegate.execute(() -> {
T value = runnable.call();
monoSink.complete(value);
});
monoSink.doOnCancel(task::cancel);
})
}
}
using Mono.create
we have a guarantee of lazy execution on every subscriber.
Also, using MonoSink
API, we can quickly hook on all the essential signals from the subscriber.
Finally, Mono.create ensures that in case of anything, the value will be discarded appropriately.
Finally, having such an API it is not necessary to use defer for all the cases