Original issue
Populate the Java MDC from a thread to all its spawned inner threads (parent to children relation)
WIP solution using AspectJ
I'm able to write an aspect intercepting all Runnable
creation but since I want a different aspect instance for each use (with a custom annotation) as I have to store the MDC somewhere when executing code from the parent thread, I'm unable to write a pointcut intercepting the newly created instance of Runnable
so I can set the MDC using the previous context map.
Here's the aspect
@Aspect("percflow(@annotation(com.bell.cts.commons.cron.framework.scheduler.domain.MDCTrace))")
public class MDCTraceAspect {
private final Logger logger = LoggerFactory.getLogger(MDCTraceAspect.class);
private int i;
private final Map<String, String> contextMap;
public MDCTraceAspect() {
i = new Random().nextInt();
MDC.clear();
MDC.put("IP", String.valueOf(i));
contextMap = MDC.getCopyOfContextMap();
logger.debug(String.format("[%d] New Aspect", Thread.currentThread().getId()));
}
@Before("execution(Runnable+.new(..))")
public void beforeNewRunnable(JoinPoint joinPoint) {
MDC.setContextMap(contextMap);
logger.debug(String.format("[%d] New Runnable", Thread.currentThread().getId()));
}
@Before("execution(* Runnable+.*(..))")
public void before(JoinPoint joinPoint) {
MDC.setContextMap(contextMap);
logger.info(String.format("[%d] RUNNABLE WORKS!", Thread.currentThread().getId()));
}
@Before("execution(void Child.run())")
public void beforeChildRun(JoinPoint joinPoint) {
MDC.setContextMap(contextMap);
logger.info(String.format("[%d] CHILD WORKS!", Thread.currentThread().getId()));
}
}
And here's a Parent
, Child
and custom annotation
public class Parent {
private final Logger logger = LoggerFactory.getLogger(Parent.class);
private ExecutorService executorService;
@MDCTrace
public void runMultiThreadByExecutor() throws InterruptedException {
executorService = Executors.newCachedThreadPool();
logger.info(String.format("[%d] Before start child thread", Thread.currentThread().getId()));
executorService.submit(new Child());
logger.info(String.format("[%d] After start child thread", Thread.currentThread().getId()));
List.of(10, 11, 12, 13, 14).parallelStream().forEach(i -> {
logger.info(String.format("[%d] Loop iteration #%d", Thread.currentThread().getId(), i));
});
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.SECONDS);
logger.info(String.format("[%d] ExecutorService is over", Thread.currentThread().getId()));
}
public static void main(String[] args) throws InterruptedException {
Parent parent = new Parent();
parent.runMultiThreadByExecutor();
}
}
public class Child implements Runnable {
private final Logger logger = LoggerFactory.getLogger(Child.class);
@Override
public void run() {
logger.info(String.format("[%d] Running in the child thread", Thread.currentThread().getId()));
}
}
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface MDCTrace {
}
Goal
Final goal is to simply have to annotate the entry point of a MDC context so any threads/runnables/futures created while executing the annotated method (even other objects) are intercepted so MDC is properly set using the original/parent thread MDC info stored in the aspect instance of current context flow.
Both tentatives before
and beforeChildRun
don't work and I can't find how to make one of them work.
Thank you
Bonus point if someone can guide me on how to also make this works for parallelStream
too.
First you need to understand that a new thread is not within the control flow of its parent thread. See my other answers for an explanation incl. sample code and console log:
Thus, anything related to cflow()
or aspect instantiation percflow()
will not work in this case, as you already noticed.
The only way to get a part of what you need - at least for your own classes if you use compile-time weaving and also for third-party JARs/classes (except JRE classes) if you use load-time weaving - is manual bookkeeping.
Look at this example, I modified your own code a bit in order to show a workaround and its limits. I also wanted to avoid using any logging framework and am printing to System.out
instead. Thus I had to replace MDC
by a dummy class in order to make the code compile.
package de.scrum_master.app;
import java.util.HashMap;
import java.util.Map;
public class MDC {
private static ThreadLocal<Map<String, String>> contextMap = new InheritableThreadLocal<>();
static {
clear();
}
public static void clear() {
contextMap.set(new HashMap<>());
}
public static void put(String key, String value) {
contextMap.get().put(key, value);
}
public static Map<String, String> getCopyOfContextMap() {
return new HashMap<>(contextMap.get());
}
public static void setContextMap(Map<String, String> contextMap) {
MDC.contextMap.set(contextMap);
}
}
package de.scrum_master.app;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface MDCTrace {}
package de.scrum_master.app;
public class Child implements Runnable {
@Override
public void run() {
System.out.println(String.format("[%d] Running in the child thread", Thread.currentThread().getId()));
}
}
package de.scrum_master.app;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class Parent {
private ExecutorService executorService;
@MDCTrace
public Runnable runMultiThreadByExecutorAnnotated(Runnable runnable) throws InterruptedException {
return doStuff(runnable);
}
@MDCTrace
public Runnable runMultiThreadByExecutorAnnotated() throws InterruptedException {
return doStuff();
}
public Runnable runMultiThreadByExecutorPlain() throws InterruptedException {
return doStuff();
}
public Runnable runMultiThreadByExecutorPlain(Runnable runnable) throws InterruptedException {
return doStuff(runnable);
}
private Runnable doStuff() throws InterruptedException {
return doStuff(new Child());
}
private Runnable doStuff(Runnable runnable) throws InterruptedException {
executorService = Executors.newCachedThreadPool();
System.out.println(String.format("[%d] Before start child thread", Thread.currentThread().getId()));
executorService.submit(runnable);
System.out.println(String.format("[%d] After start child thread", Thread.currentThread().getId()));
List.of(10, 11, 12, 13, 14).parallelStream().forEach(i -> {
//System.out.println(String.format("[%d] Loop iteration #%d", Thread.currentThread().getId(), i));
});
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.SECONDS);
System.out.println(String.format("[%d] ExecutorService is over", Thread.currentThread().getId()));
System.out.println("\n----------------------------------------\n");
return runnable;
}
public static void main(String[] args) throws InterruptedException {
Parent parent = new Parent();
System.out.println("MDCTrace annotation");
parent.runMultiThreadByExecutorAnnotated();
System.out.println("No annotation");
parent.runMultiThreadByExecutorPlain();
Runnable runnable = new Child();
System.out.println("MDCTrace annotation (runnable created outside of control flow)");
parent.runMultiThreadByExecutorAnnotated(runnable);
System.out.println("No annotation (re-use runnable created outside of control flow)");
parent.runMultiThreadByExecutorPlain(runnable);
System.out.println("MDCTrace annotation (save returned runnable)");
runnable = parent.runMultiThreadByExecutorAnnotated();
System.out.println("No annotation (re-use returned runnable)");
parent.runMultiThreadByExecutorPlain(runnable);
}
}
As you can see I have a positive and a negative test example (with and without @MDCTrace
annotation) and three cases for each of these:
Numbers 2 and 3 are there to demonstrate the limits of the subsequent aspect approach which mainly consists in doing manual bookkeeping of all Runnable
instances created within the control flow of an annotated method.
package de.scrum_master.aspect;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import de.scrum_master.app.MDC;
@Aspect
public class MDCTraceAspect {
private static final Random RANDOM = new Random();
private Map<String, String> contextMap;
private Set<Runnable> runnables = new HashSet<>();
@Pointcut("@annotation(de.scrum_master.app.MDCTrace) && execution(* *(..))")
private static void entryPoint() {}
@Before("entryPoint()")
public void executeEntryPoint() {
MDC.clear();
MDC.put("IP", String.valueOf(RANDOM.nextInt()));
contextMap = MDC.getCopyOfContextMap();
System.out.println(String.format("[%d] * Entry point", Thread.currentThread().getId()));
}
@Before("execution(Runnable+.new(..)) && cflow(entryPoint()) && target(runnable)")
public void beforeNewRunnable(JoinPoint joinPoint, Runnable runnable) {
runnables.add(runnable);
MDC.setContextMap(contextMap);
System.out.println(String.format("[%d] * New Runnable", Thread.currentThread().getId()));
}
@Before("execution(public void Runnable+.run(..)) && target(runnable)")
public void beforeRunnableExecution(JoinPoint joinPoint, Runnable runnable) {
if (!runnables.contains(runnable))
return;
MDC.setContextMap(contextMap);
System.out.println(String.format("[%d] * Runnable started", Thread.currentThread().getId()));
}
}
This yields the following console log (broken down into 3 parts):
MDCTrace annotation
[1] * Entry point
[1] * New Runnable
[1] Before start child thread
[1] After start child thread
[12] * Runnable started
[12] Running in the child thread
[1] ExecutorService is over
----------------------------------------
No annotation
[1] Before start child thread
[1] After start child thread
[13] Running in the child thread
[1] ExecutorService is over
----------------------------------------
This works as you might expect it. No surprises here.
MDCTrace annotation (runnable created outside of control flow)
[1] * Entry point
[1] Before start child thread
[1] After start child thread
[14] Running in the child thread
[1] ExecutorService is over
----------------------------------------
No annotation (re-use runnable created outside of control flow)
[1] Before start child thread
[1] After start child thread
[15] Running in the child thread
[1] ExecutorService is over
----------------------------------------
As you can see, no log output here after the entry point has been reached. This is not what you might want, but the runnable has been created outside the control flow and passed in, so the aspect does not get triggered here.
MDCTrace annotation (save returned runnable)
[1] * Entry point
[1] * New Runnable
[1] Before start child thread
[1] After start child thread
[16] * Runnable started
[16] Running in the child thread
[1] ExecutorService is over
----------------------------------------
No annotation (re-use returned runnable)
[1] Before start child thread
[1] After start child thread
[17] * Runnable started
[17] Running in the child thread
[1] ExecutorService is over
----------------------------------------
Here part A is like in case no. 1, but part B also prints a log line for the non-annotated method because the Runnable
instance has been registered in the aspect's bookkeeping during the control flow of the annotated method. So here you see a log line you probably rather want to avoid.
So what is the conclusion here? There is no perfect solution, you need to check your code and what cases you have there, then design the aspect to accommodate those cases. If you don't have cases like the ones I made up in no. 2 and 3, my approach works.
Some other things to note:
Runnable
s and Thread
s. They are not the same, you can re-use the same runnable in multiple threads. Furthermore, you can re-use threads too, e.g. by using thread pools. So this can get arbitrarily complex. Each runnable or thread marked as a target for your aspect could be re-used later in a context you don't want to log.