Search code examples
javamultithreadingaopaspectjmdc

How to intercept Runnable creation and execution under master thread to populate context flow data using AspectJ


Original issue

Populate the Java MDC from a thread to all its spawned inner threads (parent to children relation)

WIP solution using AspectJ

I'm able to write an aspect intercepting all Runnable creation but since I want a different aspect instance for each use (with a custom annotation) as I have to store the MDC somewhere when executing code from the parent thread, I'm unable to write a pointcut intercepting the newly created instance of Runnable so I can set the MDC using the previous context map.

Here's the aspect

@Aspect("percflow(@annotation(com.bell.cts.commons.cron.framework.scheduler.domain.MDCTrace))")
public class MDCTraceAspect {

  private final Logger logger = LoggerFactory.getLogger(MDCTraceAspect.class);
  private int i;
  private final Map<String, String> contextMap;

  public MDCTraceAspect() {
    i = new Random().nextInt();
    MDC.clear();
    MDC.put("IP", String.valueOf(i));
    contextMap = MDC.getCopyOfContextMap();
    logger.debug(String.format("[%d] New Aspect", Thread.currentThread().getId()));
  }

  @Before("execution(Runnable+.new(..))")
  public void beforeNewRunnable(JoinPoint joinPoint) {
    MDC.setContextMap(contextMap);
    logger.debug(String.format("[%d] New Runnable", Thread.currentThread().getId()));
  }

  @Before("execution(* Runnable+.*(..))")
  public void before(JoinPoint joinPoint) {
    MDC.setContextMap(contextMap);
    logger.info(String.format("[%d] RUNNABLE WORKS!", Thread.currentThread().getId()));
  }

  @Before("execution(void Child.run())")
  public void beforeChildRun(JoinPoint joinPoint) {
    MDC.setContextMap(contextMap);
    logger.info(String.format("[%d] CHILD WORKS!", Thread.currentThread().getId()));
  }
}

And here's a Parent, Child and custom annotation

public class Parent {

  private final Logger logger = LoggerFactory.getLogger(Parent.class);
  private ExecutorService executorService;

  @MDCTrace
  public void runMultiThreadByExecutor() throws InterruptedException {
    executorService = Executors.newCachedThreadPool();
    logger.info(String.format("[%d] Before start child thread", Thread.currentThread().getId()));

    executorService.submit(new Child());
    logger.info(String.format("[%d] After start child thread", Thread.currentThread().getId()));

    List.of(10, 11, 12, 13, 14).parallelStream().forEach(i -> {
      logger.info(String.format("[%d] Loop iteration #%d", Thread.currentThread().getId(), i));
    });

    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.SECONDS);
    logger.info(String.format("[%d] ExecutorService is over", Thread.currentThread().getId()));
  }

  public static void main(String[] args) throws InterruptedException {
    Parent parent = new Parent();
    parent.runMultiThreadByExecutor();
  }
}
public class Child implements Runnable {

  private final Logger logger = LoggerFactory.getLogger(Child.class);

  @Override
  public void run() {
    logger.info(String.format("[%d] Running in the child thread", Thread.currentThread().getId()));
  }
}
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface MDCTrace {
}

Goal

Final goal is to simply have to annotate the entry point of a MDC context so any threads/runnables/futures created while executing the annotated method (even other objects) are intercepted so MDC is properly set using the original/parent thread MDC info stored in the aspect instance of current context flow.

Both tentatives before and beforeChildRun don't work and I can't find how to make one of them work.

Thank you

Bonus point if someone can guide me on how to also make this works for parallelStream too.


Solution

  • First you need to understand that a new thread is not within the control flow of its parent thread. See my other answers for an explanation incl. sample code and console log:

    Thus, anything related to cflow() or aspect instantiation percflow() will not work in this case, as you already noticed.

    The only way to get a part of what you need - at least for your own classes if you use compile-time weaving and also for third-party JARs/classes (except JRE classes) if you use load-time weaving - is manual bookkeeping.

    Look at this example, I modified your own code a bit in order to show a workaround and its limits. I also wanted to avoid using any logging framework and am printing to System.out instead. Thus I had to replace MDC by a dummy class in order to make the code compile.

    package de.scrum_master.app;
    
    import java.util.HashMap;
    import java.util.Map;
    
    public class MDC {
      private static ThreadLocal<Map<String, String>> contextMap = new InheritableThreadLocal<>();
    
      static {
        clear();
      }
    
      public static void clear() {
        contextMap.set(new HashMap<>());
      }
    
      public static void put(String key, String value) {
        contextMap.get().put(key, value);
      }
    
      public static Map<String, String> getCopyOfContextMap() {
        return new HashMap<>(contextMap.get());
      }
    
      public static void setContextMap(Map<String, String> contextMap) {
        MDC.contextMap.set(contextMap);
      }
    }
    
    package de.scrum_master.app;
    
    import java.lang.annotation.ElementType;
    import java.lang.annotation.Retention;
    import java.lang.annotation.RetentionPolicy;
    import java.lang.annotation.Target;
    
    @Retention(RetentionPolicy.RUNTIME)
    @Target(ElementType.METHOD)
    public @interface MDCTrace {}
    
    package de.scrum_master.app;
    
    public class Child implements Runnable {
      @Override
      public void run() {
        System.out.println(String.format("[%d] Running in the child thread", Thread.currentThread().getId()));
      }
    }
    
    package de.scrum_master.app;
    
    import java.util.List;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    public class Parent {
      private ExecutorService executorService;
    
      @MDCTrace
      public Runnable runMultiThreadByExecutorAnnotated(Runnable runnable) throws InterruptedException {
        return doStuff(runnable);
      }
    
      @MDCTrace
      public Runnable runMultiThreadByExecutorAnnotated() throws InterruptedException {
        return doStuff();
      }
    
      public Runnable runMultiThreadByExecutorPlain() throws InterruptedException {
        return doStuff();
      }
    
      public Runnable runMultiThreadByExecutorPlain(Runnable runnable) throws InterruptedException {
        return doStuff(runnable);
      }
    
      private Runnable doStuff() throws InterruptedException {
        return doStuff(new Child());
      }
    
      private Runnable doStuff(Runnable runnable) throws InterruptedException {
        executorService = Executors.newCachedThreadPool();
        System.out.println(String.format("[%d] Before start child thread", Thread.currentThread().getId()));
    
        executorService.submit(runnable);
        System.out.println(String.format("[%d] After start child thread", Thread.currentThread().getId()));
    
        List.of(10, 11, 12, 13, 14).parallelStream().forEach(i -> {
          //System.out.println(String.format("[%d] Loop iteration #%d", Thread.currentThread().getId(), i));
        });
    
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.SECONDS);
        System.out.println(String.format("[%d] ExecutorService is over", Thread.currentThread().getId()));
        System.out.println("\n----------------------------------------\n");
        return runnable;
      }
    
      public static void main(String[] args) throws InterruptedException {
        Parent parent = new Parent();
        System.out.println("MDCTrace annotation");
        parent.runMultiThreadByExecutorAnnotated();
        System.out.println("No annotation");
        parent.runMultiThreadByExecutorPlain();
    
        Runnable runnable = new Child();
        System.out.println("MDCTrace annotation (runnable created outside of control flow)");
        parent.runMultiThreadByExecutorAnnotated(runnable);
        System.out.println("No annotation (re-use runnable created outside of control flow)");
        parent.runMultiThreadByExecutorPlain(runnable);
    
        System.out.println("MDCTrace annotation (save returned runnable)");
        runnable = parent.runMultiThreadByExecutorAnnotated();
        System.out.println("No annotation (re-use returned runnable)");
        parent.runMultiThreadByExecutorPlain(runnable);
    }
    }
    

    As you can see I have a positive and a negative test example (with and without @MDCTrace annotation) and three cases for each of these:

    1. Creating runnables inside the control flow of the annotated (or non-annotated) method like you do in your own example.
    2. Creating runnables outside the control flow of the annotated (or non-annotated) method, passing them by reference into the control flow.
    3. Creating the first runnable inside the control flow of the annotated method, returning it and passing it into the control flow of the non-annotated method.

    Numbers 2 and 3 are there to demonstrate the limits of the subsequent aspect approach which mainly consists in doing manual bookkeeping of all Runnable instances created within the control flow of an annotated method.

    package de.scrum_master.aspect;
    
    import java.util.HashSet;
    import java.util.Map;
    import java.util.Random;
    import java.util.Set;
    
    import org.aspectj.lang.JoinPoint;
    import org.aspectj.lang.annotation.Aspect;
    import org.aspectj.lang.annotation.Before;
    import org.aspectj.lang.annotation.Pointcut;
    
    import de.scrum_master.app.MDC;
    
    @Aspect
    public class MDCTraceAspect {
      private static final Random RANDOM = new Random(); 
      private Map<String, String> contextMap;
      private Set<Runnable> runnables = new HashSet<>();
    
      @Pointcut("@annotation(de.scrum_master.app.MDCTrace) && execution(* *(..))")
      private static void entryPoint() {}
    
      @Before("entryPoint()")
      public void executeEntryPoint() {
        MDC.clear();
        MDC.put("IP", String.valueOf(RANDOM.nextInt()));
        contextMap = MDC.getCopyOfContextMap();
        System.out.println(String.format("[%d] * Entry point", Thread.currentThread().getId()));
      }
    
      @Before("execution(Runnable+.new(..)) && cflow(entryPoint()) && target(runnable)")
      public void beforeNewRunnable(JoinPoint joinPoint, Runnable runnable) {
        runnables.add(runnable);
        MDC.setContextMap(contextMap);
        System.out.println(String.format("[%d] * New Runnable", Thread.currentThread().getId()));
      }
    
      @Before("execution(public void Runnable+.run(..)) && target(runnable)")
      public void beforeRunnableExecution(JoinPoint joinPoint, Runnable runnable) {
        if (!runnables.contains(runnable))
          return;
        MDC.setContextMap(contextMap);
        System.out.println(String.format("[%d] * Runnable started", Thread.currentThread().getId()));
      }
    }
    

    This yields the following console log (broken down into 3 parts):


    1. Creating runnables inside the control flow of the annotated (or non-annotated) method like you do in your own example:
    MDCTrace annotation
    [1] * Entry point
    [1] * New Runnable
    [1] Before start child thread
    [1] After start child thread
    [12] * Runnable started
    [12] Running in the child thread
    [1] ExecutorService is over
    
    ----------------------------------------
    
    No annotation
    [1] Before start child thread
    [1] After start child thread
    [13] Running in the child thread
    [1] ExecutorService is over
    
    ----------------------------------------
    
    

    This works as you might expect it. No surprises here.


    1. Creating runnables outside the control flow of the annotated (or non-annotated) method, passing them by reference into the control flow:
    MDCTrace annotation (runnable created outside of control flow)
    [1] * Entry point
    [1] Before start child thread
    [1] After start child thread
    [14] Running in the child thread
    [1] ExecutorService is over
    
    ----------------------------------------
    
    No annotation (re-use runnable created outside of control flow)
    [1] Before start child thread
    [1] After start child thread
    [15] Running in the child thread
    [1] ExecutorService is over
    
    ----------------------------------------
    
    

    As you can see, no log output here after the entry point has been reached. This is not what you might want, but the runnable has been created outside the control flow and passed in, so the aspect does not get triggered here.


    1. Creating the first runnable inside the control flow of the annotated method, returning it and passing it into the control flow of the non-annotated method:
    MDCTrace annotation (save returned runnable)
    [1] * Entry point
    [1] * New Runnable
    [1] Before start child thread
    [1] After start child thread
    [16] * Runnable started
    [16] Running in the child thread
    [1] ExecutorService is over
    
    ----------------------------------------
    
    No annotation (re-use returned runnable)
    [1] Before start child thread
    [1] After start child thread
    [17] * Runnable started
    [17] Running in the child thread
    [1] ExecutorService is over
    
    ----------------------------------------
    
    

    Here part A is like in case no. 1, but part B also prints a log line for the non-annotated method because the Runnable instance has been registered in the aspect's bookkeeping during the control flow of the annotated method. So here you see a log line you probably rather want to avoid.

    So what is the conclusion here? There is no perfect solution, you need to check your code and what cases you have there, then design the aspect to accommodate those cases. If you don't have cases like the ones I made up in no. 2 and 3, my approach works.

    Some other things to note:

    • Beware the difference of Runnables and Threads. They are not the same, you can re-use the same runnable in multiple threads. Furthermore, you can re-use threads too, e.g. by using thread pools. So this can get arbitrarily complex. Each runnable or thread marked as a target for your aspect could be re-used later in a context you don't want to log.
    • For parallel streams or other cases in which runnables are being created by the JRE itself this will never work because the runnables and threads created by internal JRE classes are not subject to aspect weaving, neither in the compile-time nor in the load-time weaving case. In theory you could weave aspect code into the JRE or JDK, creating new JARs from the woven classes and replacing the originals or prepending them to the boot classpath. But this is a bit complicated and you really need to control your application's execution environment in order to start the JVM with the right parameters. I did that before and it works, but this is not for beginners and probably a bad idea to begin with.