Search code examples
javaspring-bootspring-integrationmdc

Spring integration : transfer mdc in error flow


I'm using spring-integration

Most of my flows works on DirectChannels which assure synchronous processing of events however i've noticed that in case of any errors in these flows, the error flow is triggered on a separate thread.

My logging logic relies heavily on MDC properties and as such i'm not able to trace the errors back to the source flow.

@Configuration
public class GenericErrorFlow {

    public static final String CHANNEL = "errorChannel";

    @Bean
    QueueChannel errorChannel() {
        return new QueueChannel(500);
    }

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerSpec poller() {
        return Pollers.fixedRate(500)
                .errorChannel(CHANNEL);
    }
}
@Configuration
public class ExampleFlow{

public static final String CHANNEL = "exampleFlowChannel";
  @Bean(name = CHANNEL)
    public MessageChannel getChannel() {
        DirectChannel dc = new DirectChannel();
        dc.setComponentName(CHANNEL);
        return dc;
    }

    @Bean
    public IntegrationFlow exampleFlow() {
        return IntegrationFlows.from(CHANNEL)
                .<ConsumerRecord>handle((message, h) -> {
                 //perform some logic here
                }).get();
    }
}
@MessagingGateway (errorChannel = GenericErrorFlow.CHANNEL)
public interface MessageGateway {
    // other routings here
}

Below are the few possible approaches i could think of (in order of preference):

  1. We define some custom thread execution logic and transfer the MDC of the thread that runs exampleFlow to the errorFlow thread
  2. We disable the async behaviour of error routing ensuring that both exampleFlow and errorFlow run on same thread. In this case there would be no need of MDC propogation
  3. We serialize the MDC in some standard format and pass it along the message as one of the headers and deserialise it in the subsequent thread (this feels more of a hack; not sure if message headers were intended to be used this way)

Please suggest which of them should i go with (some entirely new approach is also fine). Would also appreciate some pointers on how to implement approach 1 and 2.


Solution

  • Your errorChannel is decalred as a QueueChannel. Therefore it is pollable and processed by the scheduled tasks. Those threads in the TaskScheduler know nothing about the producing thread. Therefore you cannot implement your idea #1.

    The second idea is the simplest and there is just enough to make your errorChannel as a DirectChannel.

    The third one is fully OK and that's exactly what we do in the ThreadStatePropagationChannelInterceptor. So, you just need to implement this interceptor and point to what thread state (and how) you would like to propagate from the producer to consumer over that QueueChannel.

    NOTE: it is not OK to name your error channel as a errorChannel which is reserved for global default channel for errors:

    https://docs.spring.io/spring-integration/reference/channel/special-channels.html

    https://docs.spring.io/spring-integration/reference/error-handling.html