Search code examples
javaspringannotationsspring-aop

Advice precedence problem when one @Around advice does not proceed


Updated to rephrase question with additional information

We have two annotations:

  • CustomLogging
  • PollableStreamListener

Both are implemented using aspects with Spring AOP.

CustomLogging annotation:

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface CustomLogging {

}

CustomLoggingAspect class:

@Aspect
@Component
@Slf4j
@Order(value = 1)
public class CustomLoggingAspect {

  @Before("@annotation(customLogging)")
  public void addCustomLogging(CustomLogging customLogging) {
    log.info("Logging some information");
  }

}

PollableStreamListener annotation:

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface PollableStreamListener {

}

PollableStreamListenerAspect class:

@Aspect
@Component
@Slf4j
public class PollableStreamListenerAspect {

  private final ExecutorService executor = Executors.newFixedThreadPool(1);

  private volatile boolean paused = false;

  @Around(value = "@annotation(pollableStreamListener) && args(dataCapsule,..)")
  public void receiveMessage(ProceedingJoinPoint joinPoint,
      PollableStreamListener pollableStreamListener, Object dataCapsule) {
    if (dataCapsule instanceof Message) {
      Message<?> message = (Message<?>) dataCapsule;
      AcknowledgmentCallback callback = StaticMessageHeaderAccessor
          .getAcknowledgmentCallback(message);
      callback.noAutoAck();

      if (!paused) {
        // The separate thread is not busy with a previous message, so process this message:
        Runnable runnable = () -> {
          try {
            paused = true;

            // Call method to process this Kafka message
            joinPoint.proceed();

            callback.acknowledge(Status.ACCEPT);
          } catch (Throwable e) {
            callback.acknowledge(Status.REJECT);
            throw new PollableStreamListenerException(e);
          } finally {
            paused = false;
          }
        };

        executor.submit(runnable);
      } else {
        // The separate thread is busy with a previous message, so re-queue this message for later:
        callback.acknowledge(Status.REQUEUE);
        log.info("Re-queue");
      }
    }
  }

}

We have a class called CleanupController which regularly executes according to a schedule.

CleanupController class:

@Scheduled(fixedDelayString = "${app.pollable-consumer.time-interval}")
  public void pollForDeletionRequest() {
    log.trace("Polling for new messages");
    cleanupInput.poll(cleanupSubmissionService::submitDeletion);
  }

When the schedule executes, it calls a method in another class that is annotated with both PollableStreamListener and CustomLogging. I've added a Thread.sleep() to imitate the method taking a while to execute.

@PollableStreamListener
  @CustomLogging
  public void submitDeletion(Message<?> received) {
    try {
      log.info("Starting processing");
      Thread.sleep(10000);
      log.info("Finished processing");
    } catch (Exception e) {
      log.info("Error", e);
    }
  }

The problem I'm facing is that the output produced by CustomLogging is printed each time we poll for new messages using the @Schedule, but I only want it to print if the annotated method is actually executed (which may happen now, or may happen in the future depending on whether another message is currently being processed). This leads to confusing log messages because it implies that the message is being processed now when in fact it has been re-queued for future execution.

Is there some way that we can make these annotations work well together so that the CustomLogging output only happens if the annotated method executes?


Update to use @Order on PollableStreamListener

As per the suggestion of @dunni, I made the following changes to the original example above.

Set the order of 1 on the PollableStreamListenerAspect:

@Aspect
@Component
@Slf4j
@Order(value = 1)
public class PollableStreamListenerAspect {
...
}

Increase the order to 2 for CustomLoggingAspect:

@Aspect
@Component
@Slf4j
@Order(value = 2)
public class CustomLoggingAspect {
...
}

I found that after making these changes that the polling fails to detect new requests at all. It is the change on the PollableStreamListenerAspect that introduced this issue (I commented out that line and re-ran it, and things behaved as they did before).


Update to use @Order(value = Ordered.HIGHEST_PRECEDENCE) on PollableStreamListener

I've updated the PollableStreamListener to use the HIGHEST_PRECEDENCE and update the @Around value:

@Aspect
@Component
@Slf4j
@Order(value = Ordered.HIGHEST_PRECEDENCE)
public class PollableStreamListenerAspect {

  private final ExecutorService executor = Executors.newFixedThreadPool(1);

  private volatile boolean paused = false;

  @Around(value = "@annotation(au.com.brolly.commons.stream.PollableStreamListener)")
  public void receiveMessage(ProceedingJoinPoint joinPoint) {
    if (!paused) {
      // The separate thread is not busy with a previous message, so process this message:
      Runnable runnable = () -> {
        try {
          paused = true;

          // Call method to process this Kafka message
          joinPoint.proceed();
        } catch (Throwable e) {
          e.printStackTrace();
          throw new PollableStreamListenerException(e);
        } finally {
          paused = false;
        }
      };

      executor.submit(runnable);
    } else {
      // The separate thread is busy with a previous message, so re-queue this message for later:
      log.info("Re-queue");
    }
  }
}

This is partially working now. When I send a Kafka message, it gets processed, and the logging from the CustomLogging annotation only prints if another Kafka message isn't being processed. So far so good.

The next challenge is to get the @Around to accept the Message that is being provided via Kafka. I attempted this using the above example with the following lines changed:

  @Around(value = "@annotation(au.com.brolly.commons.stream.PollableStreamListener) && args(dataCapsule,..)")
  public void receiveMessage(ProceedingJoinPoint joinPoint, Object dataCapsule) {
...
}

The server starts correctly, but when I publish a Kafka message then I get the following exception:

2021-04-22 10:38:00,055 ERROR [scheduling-1] org.springframework.core.log.LogAccessor: org.springframework.messaging.MessageHandlingException: nested exception is java.lang.IllegalStateException: Required to bind 2 arguments, but only bound 1 (JoinPointMatch was NOT bound in invocation), failedMessage=GenericMessage...
    at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.doHandleMessage(DefaultPollableMessageSource.java:330)
    at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.handle(DefaultPollableMessageSource.java:361)
    at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.poll(DefaultPollableMessageSource.java:219)
    at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.poll(DefaultPollableMessageSource.java:200)
    at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.poll(DefaultPollableMessageSource.java:68)
    at xyx.pollForDeletionRequest(CleanupController.java:35)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalStateException: Required to bind 2 arguments, but only bound 1 (JoinPointMatch was NOT bound in invocation)
    at org.springframework.aop.aspectj.AbstractAspectJAdvice.argBinding(AbstractAspectJAdvice.java:596)
    at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:624)
    at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:72)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:175)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
    at xyz.CleanupSubmissionServiceImpl$$EnhancerBySpringCGLIB$$8737f6f8.submitDeletion(<generated>)
    at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.doHandleMessage(DefaultPollableMessageSource.java:327)
    ... 17 more

Solution

  • Because of this problem, you need to use @Order(value = Ordered.HIGHEST_PRECEDENCE) on PollableStreamListenerAspect. Quite weird indeed, but then it works as you wish it to. IMO the issue ought to be fixed in Spring, though. Having to use this workaround is ugly and only works if your aspect calling proceed() asynchronously actually does have the highest precedence, which is not always the case. As an alternative, you could use native AspectJ and its own concept of declaring advice precedence, which is independent of Spring internals.

    Here is a simplified version of your application as an MCVE:

    Annotations:

    package de.scrum_master.spring.q67155048;
    
    import java.lang.annotation.ElementType;
    import java.lang.annotation.Retention;
    import java.lang.annotation.RetentionPolicy;
    import java.lang.annotation.Target;
    
    @Target(ElementType.METHOD)
    @Retention(RetentionPolicy.RUNTIME)
    public @interface CustomLogging {}
    
    package de.scrum_master.spring.q67155048;
    
    import java.lang.annotation.ElementType;
    import java.lang.annotation.Retention;
    import java.lang.annotation.RetentionPolicy;
    import java.lang.annotation.Target;
    
    @Target(ElementType.METHOD)
    @Retention(RetentionPolicy.RUNTIME)
    public @interface PollableStreamListener {}
    

    Component with method carrying both annotations:

    package de.scrum_master.spring.q67155048;
    
    import org.springframework.stereotype.Component;
    
    @Component
    public class MyComponent {
      private int counter = 0;
    
      @PollableStreamListener
      @CustomLogging
      public void submitDeletion() {
        try {
          System.out.println("  Starting processing #" + ++counter);
          Thread.sleep(1000);
          System.out.println("  Finished processing #" + counter);
        }
        catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }
    

    Aspects:

    package de.scrum_master.spring.q67155048;
    
    import org.aspectj.lang.annotation.Aspect;
    import org.aspectj.lang.annotation.Before;
    import org.springframework.stereotype.Component;
    
    @Aspect
    @Component
    public class CustomLoggingAspect {
      @Before("@annotation(de.scrum_master.spring.q67155048.CustomLogging)")
      public void addCustomLogging() {
        System.out.println("Logging");
      }
    }
    
    package de.scrum_master.spring.q67155048;
    
    import org.aspectj.lang.ProceedingJoinPoint;
    import org.aspectj.lang.annotation.Around;
    import org.aspectj.lang.annotation.Aspect;
    import org.springframework.core.Ordered;
    import org.springframework.core.annotation.Order;
    import org.springframework.stereotype.Component;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    @Aspect
    @Component
    @Order(value = Ordered.HIGHEST_PRECEDENCE)
    public class PollableStreamListenerAspect {
      public static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(1);
      private volatile boolean paused = false;
    
      @Around("@annotation(de.scrum_master.spring.q67155048.PollableStreamListener)")
      public void receiveMessage(ProceedingJoinPoint joinPoint) throws Throwable {
        System.out.println("Receiving message");
        if (!paused) {
          // The separate thread is not busy with a previous message, so process this message:
          Runnable runnable = () -> {
            try {
              paused = true;
              joinPoint.proceed();
            }
            catch (Throwable throwable) {
              throwable.printStackTrace();
            }
            finally {
              paused = false;
            }
          };
          EXECUTOR_SERVICE.submit(runnable);
        }
        else {
          System.out.println("  Re-queue");
        }
      }
    }
    

    Driver application:

    The application calls the target method every 500 ms, but execution takes 1,000 ms. So we expect to see ~50% of the calls being re-queued without any logging in that case, because the higher precedence aspect does not proceed to the target method.

    package de.scrum_master.spring.q67155048;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.ConfigurableApplicationContext;
    import org.springframework.context.annotation.Configuration;
    
    @SpringBootApplication
    @Configuration
    public class DemoApplication {
      public static void main(String[] args) throws InterruptedException {
        try (ConfigurableApplicationContext appContext = SpringApplication.run(DemoApplication.class, args)) {
          doStuff(appContext);
        }
      }
    
      private static void doStuff(ConfigurableApplicationContext appContext) throws InterruptedException {
        MyComponent myComponent = appContext.getBean(MyComponent.class);
        for (int i = 0; i < 10; i++) {
          myComponent.submitDeletion();
          Thread.sleep(500);
        }
        // This is just to make the application exit cleanly
        PollableStreamListenerAspect.EXECUTOR_SERVICE.shutdown();
      }
    }
    

    Console log:

      .   ____          _            __ _ _
     /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
    ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
     \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
      '  |____| .__|_| |_|_| |_\__, | / / / /
     =========|_|==============|___/=/_/_/_/
     :: Spring Boot ::        (v2.1.8.RELEASE)
    
    2021-04-20 12:56:03.675  INFO 13560 --- [           main] d.s.spring.q67155048.DemoApplication     : Starting DemoApplication on Xander-Ultrabook with PID 13560 (C:\Users\alexa\Documents\java-src\spring-aop-playground\target\classes started by alexa in C:\Users\alexa\Documents\java-src\spring-aop-playground)
    ...
    2021-04-20 12:56:07.666  INFO 13560 --- [           main] d.s.spring.q67155048.DemoApplication     : Started DemoApplication in 4.65 seconds (JVM running for 7.181)
    Receiving message
    Logging
      Starting processing #1
    Receiving message
      Re-queue
      Finished processing #1
    Receiving message
    Logging
      Starting processing #2
    Receiving message
      Re-queue
      Finished processing #2
    Receiving message
    Logging
      Starting processing #3
    Receiving message
      Re-queue
      Finished processing #3
    Receiving message
    Logging
      Starting processing #4
    Receiving message
      Re-queue
      Finished processing #4
    Receiving message
    Logging
      Starting processing #5
    Receiving message
      Re-queue
      Finished processing #5
    2021-04-20 12:56:12.767  INFO 13560 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'
    ...
    

    See? We count 10x "Receiving message", but only 5x "Re-queue" and 5x "Logging". Please note that I numbered the processing calls because they are asynchronous. That way it is easier to follow when they start and finish.


    Update in response to user comment:

    I have updated my MCVE in order to reproduce your problem with the parameter binding. The new or changed files are:

    package de.scrum_master.spring.q67155048;
    
    public class Message<T> {
      private T content;
    
      public Message(T content) {
        this.content = content;
      }
    
      @Override
      public String toString() {
        return "Message{" +
          "content=" + content +
          '}';
      }
    }
    
    package de.scrum_master.spring.q67155048;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.ConfigurableApplicationContext;
    import org.springframework.context.annotation.Configuration;
    
    @SpringBootApplication
    @Configuration
    public class DemoApplication {
      public static void main(String[] args) throws InterruptedException {
        try (ConfigurableApplicationContext appContext = SpringApplication.run(DemoApplication.class, args)) {
          doStuff(appContext);
        }
      }
    
      private static void doStuff(ConfigurableApplicationContext appContext) throws InterruptedException {
        MyComponent myComponent = appContext.getBean(MyComponent.class);
        Message<String> message = new Message<>("Hi there!");
        for (int i = 0; i < 10; i++) {
          myComponent.submitDeletion(message);
          Thread.sleep(500);
        }
        // This is just to make the application exit cleanly
        PollableStreamListenerAspect.EXECUTOR_SERVICE.shutdown();
      }
    }
    
    package de.scrum_master.spring.q67155048;
    
    import org.aspectj.lang.ProceedingJoinPoint;
    import org.aspectj.lang.annotation.Around;
    import org.aspectj.lang.annotation.Aspect;
    import org.springframework.core.Ordered;
    import org.springframework.core.annotation.Order;
    import org.springframework.stereotype.Component;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    @Aspect
    @Component
    @Order(value = Ordered.HIGHEST_PRECEDENCE)
    public class PollableStreamListenerAspect {
      public static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(1);
      private volatile boolean paused = false;
    
      @Around("@annotation(de.scrum_master.spring.q67155048.PollableStreamListener) && args(dataCapsule, ..)")
      public void receiveMessage(ProceedingJoinPoint joinPoint, Object dataCapsule) throws Throwable {
        System.out.println("Receiving message");
        if (!paused) {
          // The separate thread is not busy with a previous message, so process this message:
          Runnable runnable = () -> {
            try {
              paused = true;
              System.out.println("dataCapsule = " + dataCapsule);
              joinPoint.proceed();
            }
            catch (Throwable throwable) {
              throwable.printStackTrace();
            }
            finally {
              paused = false;
            }
          };
          EXECUTOR_SERVICE.submit(runnable);
        }
        else {
          System.out.println("  Re-queue");
        }
      }
    }
    

    This yields, according to your own experience:

    Exception in thread "main" java.lang.IllegalStateException: Required to bind 2 arguments, but only bound 1 (JoinPointMatch was NOT bound in invocation)
        at org.springframework.aop.aspectj.AbstractAspectJAdvice.argBinding(AbstractAspectJAdvice.java:605)
        at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:633)
        at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:70)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:175)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
        at de.scrum_master.spring.q67155048.MyComponent$$EnhancerBySpringCGLIB$$4baa410d.submitDeletion(<generated>)
        at de.scrum_master.spring.q67155048.DemoApplication.doStuff(DemoApplication.java:21)
        at de.scrum_master.spring.q67155048.DemoApplication.main(DemoApplication.java:13)
    

    You are hitting this problem, and I have commented on the closed Spring issue #16956 about it, hoping to get it repoened and someone to fix it.

    For now, your workaround is not to use the elegant AOP argument binding but to fetch the parameter manually using JoinPoint.getArgs():

    package de.scrum_master.spring.q67155048;
    
    import org.aspectj.lang.ProceedingJoinPoint;
    import org.aspectj.lang.annotation.Around;
    import org.aspectj.lang.annotation.Aspect;
    import org.springframework.core.Ordered;
    import org.springframework.core.annotation.Order;
    import org.springframework.stereotype.Component;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    @Aspect
    @Component
    @Order(value = Ordered.HIGHEST_PRECEDENCE)
    public class PollableStreamListenerAspect {
      public static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(1);
      private volatile boolean paused = false;
    
      //@Around("@annotation(de.scrum_master.spring.q67155048.PollableStreamListener) && args(dataCapsule, ..)")
      @Around("@annotation(de.scrum_master.spring.q67155048.PollableStreamListener) && execution(* *(*, ..))")
      //public void receiveMessage(ProceedingJoinPoint joinPoint, Object dataCapsule) throws Throwable {
      public void receiveMessage(ProceedingJoinPoint joinPoint) throws Throwable {
        Object dataCapsule = joinPoint.getArgs()[0];
        System.out.println("Receiving message");
        if (!paused) {
          // The separate thread is not busy with a previous message, so process this message:
          Runnable runnable = () -> {
            try {
              paused = true;
              System.out.println("dataCapsule = " + dataCapsule);
              joinPoint.proceed();
            }
            catch (Throwable throwable) {
              throwable.printStackTrace();
            }
            finally {
              paused = false;
            }
          };
          EXECUTOR_SERVICE.submit(runnable);
        }
        else {
          System.out.println("  Re-queue");
        }
      }
    }
    

    Now it works again like this:

    Receiving message
    dataCapsule = Message{content=Hi there!}
    Logging
      Starting processing #1, message = Message{content=Hi there!}
    Receiving message
      Re-queue
    Receiving message
      Re-queue
      Finished processing #1, message = Message{content=Hi there!}
    Receiving message
    dataCapsule = Message{content=Hi there!}
    Logging
      Starting processing #2, message = Message{content=Hi there!}
    Receiving message
      Re-queue
      Finished processing #2, message = Message{content=Hi there!}
    Receiving message
    dataCapsule = Message{content=Hi there!}
    Logging
      Starting processing #3, message = Message{content=Hi there!}
    Receiving message
      Re-queue
      Finished processing #3, message = Message{content=Hi there!}
    Receiving message
    dataCapsule = Message{content=Hi there!}
    Logging
      Starting processing #4, message = Message{content=Hi there!}
    Receiving message
      Re-queue
      Finished processing #4, message = Message{content=Hi there!}
    Receiving message
    dataCapsule = Message{content=Hi there!}
    Logging
      Starting processing #5, message = Message{content=Hi there!}