Search code examples
google-cloud-dataflowapache-beamlogbackslf4j

How to set up logback MDC in apache beam and dataflow?


We are using apache beam and would like to setup the logback MDC. logback MDC is a great GREAT resource when you have a request come in and you store let's say a userId (in our case, it's custId, fileId, requestId), then anytime a developer logs, it magically stamps that information on to the developers log. the developer no longer forgets to add it every log statement he adds.

I am starting in an end to end integration type test with apache beam direct runner embedded in our microservice for testing (in production, the microservice calls dataflow). currently, I am see that the MDC is good up until after the expand() methods are called. Once the processElement methods are called, the context is of course gone since I am in another thread.

So, trying to fix this piece first. Where should I put this context such that I can restore it at the beginning of this thread.

As an example, if I have an Executor.execute(runnable), then I simply transfer context using that runnable like so

    public class MDCContextRunnable implements Runnable {
    private final Map<String, String> mdcSnapshot;
    private Runnable runnable;

    public MDCContextRunnable(Runnable runnable) {
        this.runnable = runnable;
        mdcSnapshot = MDC.getCopyOfContextMap();
    }


    @Override
    public void run() {
        try {
            MDC.setContextMap(mdcSnapshot);

            runnable.run();
            
        } Catch {
            //Must log errors before mdc is cleared
            log.error("message", e);.  /// Logs error and MDC
        } finally {
            MDC.clear();
        }

    }
}

so I need to do the same with apache beam basically. I need to

  1. Have a point to capture the MDC
  2. Have a point to restore the MDC
  3. Have a point to clear out the MDC to prevent it leaking to another request(really in case I missed something which seems to happen now and then)

Any ideas on how to do this?

oh, bonus points if it the MDC can be there when any exceptions are logged by the framework!!!! (ie. ideally, frameworks are supposed to do this for you but apache beam seems like it is not doing this. Most web frameworks have this built in).

thanks, Dean


Solution

  • Based on the context and examples you gave, it sounds like you want to use MDC to automatically capture more information for your own DoFns. Your best bet for this is, depending on the lifetime you need your context available for, to use either the StartBundle/FinishBundle or Setup/Teardown methods on your DoFns to create your MDC context (see this answer for an explanation of the differences between the two). The important thing is that these methods are executed for each instance of a DoFn, meaning they will be called on the new threads created to execute these DoFns.

    Under the Hood

    I should explain what's happening here and how this approach differs from your original goal. The way Apache Beam executes is that your written pipeline executes on your own machine and performs pipeline construction (which is where all the expand calls are occurring). However, once a pipeline is constructed, it is sent to a runner which is often executing on a separate application unless it's the Direct Runner, and then the runner either directly executes your user code or runs it in a docker environment.

    In your original approach it makes sense that you would successfully apply MDC to all logs until execution begins, because execution might not only be occurring in a different thread, but potentially also a different application or machine. However, the methods described above are executed as part of your user code, so setting up your MDC there will allow it to function on whatever thread/application/machine is executing transforms.

    Just keep in mind that those methods get called for every DoFn and you will often have mutiple DoFns per thread, which is something you may need to be wary of depending on how MDC works.