Need to process messages sequentially, irrespective of the spring.task.scheduling.pool.size
threads defined. Hence, we defined a ExecutorChannel
with single thread. However, we see the messages are processed parallelly by the caller's thread. Please suggest how to process the messages sequentially without blocking the caller thread.
@Bean
public MessageChannel svcErrorChannel() {
return new ExecutorChannel(Executors.newSingleThreadExecutor());
}
return IntegrationFlows.from(svcErrorChannel())
.log(ERROR, m -> "ErrorFlow Initiated: " + m.getPayload())
Application Logs:
2023-02-04 20:21:03,407 [boundedElastic-1 ] ERROR o.s.i.h.LoggingHandler - 1c710133ada428f0 ErrorFlow Initiated: org.springframework.messaging.MessageHandlingException: xxxxxxxxxxxxxxxx
2023-02-04 20:21:03,407 [boundedElastic-2 ] ERROR o.s.i.h.LoggingHandler - 1c710133ada428f0 ErrorFlow Initiated: org.springframework.messaging.MessageHandlingException: xxxxxxxxxxxxxxxxx
The log()
operator is essentially a ChannelInterceptor
where that logging happens in a preSend()
hook - the part of a MessageChannel
on producer side. Therefore it is expected to see your Reactor threads in those logs.
If you really would like to log consumed (not produced) messages, then you need to use a .handle(new LoggingHandler())
instead of log()
operator. Or you can use a .bridge()
before your log()
.
See docs for more info: https://docs.spring.io/spring-integration/reference/html/dsl.html#java-dsl-log