Search code examples
spring-integration

Spring Integration ExecutorChannel running on Same Caller Thread


Need to process messages sequentially, irrespective of the spring.task.scheduling.pool.size threads defined. Hence, we defined a ExecutorChannel with single thread. However, we see the messages are processed parallelly by the caller's thread. Please suggest how to process the messages sequentially without blocking the caller thread.

@Bean
public MessageChannel svcErrorChannel() {
   return new ExecutorChannel(Executors.newSingleThreadExecutor());
}

return IntegrationFlows.from(svcErrorChannel())                                             
                       .log(ERROR, m -> "ErrorFlow Initiated: " + m.getPayload())
                

Application Logs:

2023-02-04 20:21:03,407 [boundedElastic-1          ] ERROR o.s.i.h.LoggingHandler - 1c710133ada428f0 ErrorFlow Initiated: org.springframework.messaging.MessageHandlingException: xxxxxxxxxxxxxxxx
2023-02-04 20:21:03,407 [boundedElastic-2          ] ERROR o.s.i.h.LoggingHandler - 1c710133ada428f0 ErrorFlow Initiated: org.springframework.messaging.MessageHandlingException: xxxxxxxxxxxxxxxxx

Solution

  • The log() operator is essentially a ChannelInterceptor where that logging happens in a preSend() hook - the part of a MessageChannel on producer side. Therefore it is expected to see your Reactor threads in those logs.

    If you really would like to log consumed (not produced) messages, then you need to use a .handle(new LoggingHandler()) instead of log() operator. Or you can use a .bridge() before your log().

    See docs for more info: https://docs.spring.io/spring-integration/reference/html/dsl.html#java-dsl-log