We have a high-load Apache Camel application that utilizes logback/MDC for logging information. We are finding that some of the MDC info is stale on threads as forewarned in logback's documentation. I found this SO question that addresses this concern:
How to use MDC with thread pools?
How should we apply this to our camel application to avoid stale info? Is there a simple to globally change the default ThreadPoolExecutor to a custom variation as suggested in the linked question? I see you can do it for the pools themselves, but didn't see any examples for the executor. Keep in mind that our application is quite large and services a high volume of orders on a daily basis--I'd like as minimal impact to the existing application as possible.
I figured it out and wanted to post what I did in case it benefits someone else. Please note I'm using JDK 6/camel2.13.2
Camel has a DefaultExecutorServiceManager
that uses a DefaultThreadPoolFactory
. I extended the default factory into a MdcThreadPoolFactory
The DefaultThreadPoolFactory
has methods to generate RejectableThreadPoolExecutor
s and RejectableScheduledThreadPoolExecutor
s. I extended both of these into Mdc* versions that override the execute()
method to wrap the Runnable and hand-off the MDC info between threads (as specified by the link in my original question).
I created a bean instance of the MdcThreadPoolFactory
in my application configuration that is automatically picked up by Camel and used in the ExecutorServiceManager
MdcThreadPoolExecutor:
package com.mypackage.concurrent
import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor
import org.slf4j.MDC;
import java.util.Map;
import java.util.concurrent.*;
/**
* A SLF4J MDC-compatible {@link ThreadPoolExecutor}.
* <p/>
* In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate
* logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a
* thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately.
* <p/>
* Created by broda20.
* Date: 10/29/15
*/
public class MdcThreadPoolExecutor extends RejectableThreadPoolExecutor {
@SuppressWarnings("unchecked")
private Map<String, Object> getContextForTask() {
return MDC.getCopyOfContextMap();
}
public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
/**
* All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
* all delegate to this.
*/
@Override
public void execute(Runnable command) {
super.execute(wrap(command, getContextForTask()));
}
public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) {
return new Runnable() {
@Override
public void run() {
Map previous = MDC.getCopyOfContextMap();
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
try {
runnable.run();
} finally {
if (previous == null) {
MDC.clear();
} else {
MDC.setContextMap(previous);
}
}
}
};
}
}
MdcScheduledThreadPoolExecutor:
package com.mypackage.concurrent
import org.apache.camel.util.concurrent.RejectableScheduledThreadPoolExecutor
import org.slf4j.MDC;
import java.util.Map;
import java.util.concurrent.*;
/**
* A SLF4J MDC-compatible {@link ThreadPoolExecutor}.
* <p/>
* In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate
* logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a
* thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately.
* <p/>
* Created by broda20.
* Date: 10/29/15
*/
public class MdcScheduledThreadPoolExecutor extends RejectableScheduledThreadPoolExecutor {
@SuppressWarnings("unchecked")
private Map<String, Object> getContextForTask() {
return MDC.getCopyOfContextMap();
}
public MdcScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize);
}
public MdcScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, threadFactory);
}
public MdcScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
super(corePoolSize, handler);
}
public MdcScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, threadFactory, handler);
}
/**
* All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
* all delegate to this.
*/
@Override
public void execute(Runnable command) {
super.execute(wrap(command, getContextForTask()));
}
public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) {
return new Runnable() {
@Override
public void run() {
Map previous = MDC.getCopyOfContextMap();
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
try {
runnable.run();
} finally {
if (previous == null) {
MDC.clear();
} else {
MDC.setContextMap(previous);
}
}
}
};
}
}
MdcThreadPoolFactory:
package com.mypackage.concurrent
import org.apache.camel.impl.DefaultThreadPoolFactory
import org.apache.camel.spi.ThreadPoolProfile
import org.apache.camel.util.concurrent.SizedScheduledExecutorService
import org.slf4j.MDC;
import java.util.Map;
import java.util.concurrent.*;
public class MdcThreadPoolFactory extends DefaultThreadPoolFactory {
@SuppressWarnings("unchecked")
private Map<String, Object> getContextForTask() {
return MDC.getCopyOfContextMap();
}
public ExecutorService newThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize, boolean allowCoreThreadTimeOut,
RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory) throws IllegalArgumentException {
// the core pool size must be 0 or higher
if (corePoolSize < 0) {
throw new IllegalArgumentException("CorePoolSize must be >= 0, was " + corePoolSize);
}
// validate max >= core
if (maxPoolSize < corePoolSize) {
throw new IllegalArgumentException("MaxPoolSize must be >= corePoolSize, was " + maxPoolSize + " >= " + corePoolSize);
}
BlockingQueue<Runnable> workQueue;
if (corePoolSize == 0 && maxQueueSize <= 0) {
// use a synchronous queue for direct-handover (no tasks stored on the queue)
workQueue = new SynchronousQueue<Runnable>();
// and force 1 as pool size to be able to create the thread pool by the JDK
corePoolSize = 1;
maxPoolSize = 1;
} else if (maxQueueSize <= 0) {
// use a synchronous queue for direct-handover (no tasks stored on the queue)
workQueue = new SynchronousQueue<Runnable>();
} else {
// bounded task queue to store tasks on the queue
workQueue = new LinkedBlockingQueue<Runnable>(maxQueueSize);
}
ThreadPoolExecutor answer = new MdcThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue);
answer.setThreadFactory(threadFactory);
answer.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
if (rejectedExecutionHandler == null) {
rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
}
answer.setRejectedExecutionHandler(rejectedExecutionHandler);
return answer;
}
@Override
public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) {
RejectedExecutionHandler rejectedExecutionHandler = profile.getRejectedExecutionHandler();
if (rejectedExecutionHandler == null) {
rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
}
ScheduledThreadPoolExecutor answer = new MdcScheduledThreadPoolExecutor(profile.getPoolSize(), threadFactory, rejectedExecutionHandler);
//JDK7: answer.setRemoveOnCancelPolicy(true);
// need to wrap the thread pool in a sized to guard against the problem that the
// JDK created thread pool has an unbounded queue (see class javadoc), which mean
// we could potentially keep adding tasks, and run out of memory.
if (profile.getMaxPoolSize() > 0) {
return new SizedScheduledExecutorService(answer, profile.getMaxQueueSize());
} else {
return answer;
}
}
}
And finally, the bean instance:
<bean id="mdcThreadPoolFactory" class="com.mypackage.concurrent.MdcThreadPoolFactory"/>