I am writing Aspects for methods that return promises. Consider the following method:
public Mono<Stream> publishToKafka(Stream s) {
//publishToKafka is asynchronous
return Mono.just(s).flatMap(worker::publishToKafka);
}
I want to cache if the publish was successful or not. Since this is a cross-cutting concern, an Aspect looks like the best design. Here's my Aspect for it.
@Around("@annotation....")
public Object cache() {
//get the data to cache from the annotation
Object result = pjp.proceed();
cache.cache("key","data");
return result;
}
Now since publishToKafka
is asynchronous, the target method returns as soon as the thread switch happens and cache.cache()
is called. This is not what I want. What I want is that the result should be cached iff the event was successfully published to Kafka. The following advice works.
@Around("@annotation....")
public <T extends Stream<T>> Mono<T> cache() {
//get the data to cache from the annotation
return ((Mono<T>)pjp.proceed()).doOnNext(a -> cache.cache(key, data));
}
I want to understand what's going on here. Does this happen during the assembly time of the pipeline? Or during the execution time (pjp.proceed()
returns a promise) to which my advice adds the doOnNext
operator?
I need to understand assembly vs. execution time in the context of this example.
Both Spring AOP and AspectJ aspects are always executed synchronously in the same thread as the intercepted joinpoint. Thus, if your intercepted method returns immediately and the return value is something like a promise, a future or nothing (void) in combination with a callback, you cannot expect to magically get the asynchronous result in the aspect's advice. You do need to make the aspect aware of the asynchronous situation.
Having said that, I also want to mention that I never used reactive programming before, I only know the concept. From what I see in your advice, the solution should work, but one thing is not so nice: You make the advice return a new Mono
instance returned by your doOnNext(..)
call. Maybe it would be cleaner to return the original Mono
you get from proceed()
after having registered your caching callback on it, just so as to avoid any side-effects.
I don't know what else to explain, the situation is pretty clear. Feel free to ask directly related follow-up questions if my explanation does not suffice.