Search code examples
apache-camellogbackmdc

Camel MDC Logback Stale Info Under Volume


We have a high-load Apache Camel application that utilizes logback/MDC for logging information. We are finding that some of the MDC info is stale on threads as forewarned in logback's documentation. I found this SO question that addresses this concern:

How to use MDC with thread pools?

How should we apply this to our camel application to avoid stale info? Is there a simple to globally change the default ThreadPoolExecutor to a custom variation as suggested in the linked question? I see you can do it for the pools themselves, but didn't see any examples for the executor. Keep in mind that our application is quite large and services a high volume of orders on a daily basis--I'd like as minimal impact to the existing application as possible.


Solution

  • I figured it out and wanted to post what I did in case it benefits someone else. Please note I'm using JDK 6/camel2.13.2

    • Camel has a DefaultExecutorServiceManager that uses a DefaultThreadPoolFactory. I extended the default factory into a MdcThreadPoolFactory

    • The DefaultThreadPoolFactory has methods to generate RejectableThreadPoolExecutors and RejectableScheduledThreadPoolExecutors. I extended both of these into Mdc* versions that override the execute() method to wrap the Runnable and hand-off the MDC info between threads (as specified by the link in my original question).

    • I created a bean instance of the MdcThreadPoolFactory in my application configuration that is automatically picked up by Camel and used in the ExecutorServiceManager

    MdcThreadPoolExecutor:

    package com.mypackage.concurrent
    
    import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor
    import org.slf4j.MDC;
    
    import java.util.Map;
    import java.util.concurrent.*;
    
    /**
     * A SLF4J MDC-compatible {@link ThreadPoolExecutor}.
     * <p/>
     * In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate
     * logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a
     * thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately.
     * <p/>
     * Created by broda20.
     * Date: 10/29/15
     */
    public class MdcThreadPoolExecutor extends RejectableThreadPoolExecutor {
    
        @SuppressWarnings("unchecked")
        private Map<String, Object> getContextForTask() {
            return MDC.getCopyOfContextMap();
        }
    
        public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                            BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
    
        public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        }
    
        public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                            BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        }
    
        public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        }
    
        /**
         * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
         * all delegate to this.
         */
        @Override
        public void execute(Runnable command) {
            super.execute(wrap(command, getContextForTask()));
        }
    
        public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) {
            return new Runnable() {
                @Override
                public void run() {
                    Map previous = MDC.getCopyOfContextMap();
                    if (context == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(context);
                    }
                    try {
                        runnable.run();
                    } finally {
                        if (previous == null) {
                            MDC.clear();
                        } else {
                            MDC.setContextMap(previous);
                        }
                    }
                }
            };
        }
    }
    

    MdcScheduledThreadPoolExecutor:

    package com.mypackage.concurrent
    
    import org.apache.camel.util.concurrent.RejectableScheduledThreadPoolExecutor
    import org.slf4j.MDC;
    
    import java.util.Map;
    import java.util.concurrent.*;
    
    /**
     * A SLF4J MDC-compatible {@link ThreadPoolExecutor}.
     * <p/>
     * In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate
     * logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a
     * thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately.
     * <p/>
     * Created by broda20.
     * Date: 10/29/15
     */
    public class MdcScheduledThreadPoolExecutor extends RejectableScheduledThreadPoolExecutor {
    
        @SuppressWarnings("unchecked")
        private Map<String, Object> getContextForTask() {
            return MDC.getCopyOfContextMap();
        }
    
        public MdcScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize);
        }
    
        public MdcScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
            super(corePoolSize, threadFactory);
        }
    
        public MdcScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
            super(corePoolSize, handler);
        }
    
        public MdcScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
            super(corePoolSize, threadFactory, handler);
        }
    
        /**
         * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
         * all delegate to this.
         */
        @Override
        public void execute(Runnable command) {
            super.execute(wrap(command, getContextForTask()));
        }
    
        public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) {
            return new Runnable() {
                @Override
                public void run() {
                    Map previous = MDC.getCopyOfContextMap();
                    if (context == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(context);
                    }
                    try {
                        runnable.run();
                    } finally {
                        if (previous == null) {
                            MDC.clear();
                        } else {
                            MDC.setContextMap(previous);
                        }
                    }
                }
            };
        }
    }
    

    MdcThreadPoolFactory:

    package com.mypackage.concurrent
    
    import org.apache.camel.impl.DefaultThreadPoolFactory
    import org.apache.camel.spi.ThreadPoolProfile
    import org.apache.camel.util.concurrent.SizedScheduledExecutorService
    import org.slf4j.MDC;
    
    import java.util.Map;
    import java.util.concurrent.*;
    
    public class MdcThreadPoolFactory extends DefaultThreadPoolFactory {
    
        @SuppressWarnings("unchecked")
        private Map<String, Object> getContextForTask() {
            return MDC.getCopyOfContextMap();
        }
    
    
        public ExecutorService newThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize, boolean allowCoreThreadTimeOut,
                                                 RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory) throws IllegalArgumentException {
    
                // the core pool size must be 0 or higher
                if (corePoolSize < 0) {
                   throw new IllegalArgumentException("CorePoolSize must be >= 0, was " + corePoolSize);
                }
    
                // validate max >= core
                if (maxPoolSize < corePoolSize) {
                    throw new IllegalArgumentException("MaxPoolSize must be >= corePoolSize, was " + maxPoolSize + " >= " + corePoolSize);
                }
    
                BlockingQueue<Runnable> workQueue;
                if (corePoolSize == 0 && maxQueueSize <= 0) {
                    // use a synchronous queue for direct-handover (no tasks stored on the queue)
                    workQueue = new SynchronousQueue<Runnable>();
                    // and force 1 as pool size to be able to create the thread pool by the JDK
                    corePoolSize = 1;
                    maxPoolSize = 1;
                } else if (maxQueueSize <= 0) {
                    // use a synchronous queue for direct-handover (no tasks stored on the queue)
                    workQueue = new SynchronousQueue<Runnable>();
                } else {
                    // bounded task queue to store tasks on the queue
                    workQueue = new LinkedBlockingQueue<Runnable>(maxQueueSize);
                }
    
                ThreadPoolExecutor answer = new MdcThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue);
                answer.setThreadFactory(threadFactory);
                answer.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
                if (rejectedExecutionHandler == null) {
                    rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
                }
                answer.setRejectedExecutionHandler(rejectedExecutionHandler);
                return answer;
            }
    
            @Override
            public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) {
                RejectedExecutionHandler rejectedExecutionHandler = profile.getRejectedExecutionHandler();
                if (rejectedExecutionHandler == null) {
                    rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
                }
    
                ScheduledThreadPoolExecutor answer = new MdcScheduledThreadPoolExecutor(profile.getPoolSize(), threadFactory, rejectedExecutionHandler);
                //JDK7: answer.setRemoveOnCancelPolicy(true);
    
                // need to wrap the thread pool in a sized to guard against the problem that the
                // JDK created thread pool has an unbounded queue (see class javadoc), which mean
                // we could potentially keep adding tasks, and run out of memory.
                if (profile.getMaxPoolSize() > 0) {
                    return new SizedScheduledExecutorService(answer, profile.getMaxQueueSize());
                } else {
                    return answer;
                }
            }
    }
    

    And finally, the bean instance:

    <bean id="mdcThreadPoolFactory" class="com.mypackage.concurrent.MdcThreadPoolFactory"/>