I was reading about Spring Integration's FluxMessageChannel
here and here, but I still don't understand exactly what are the differences between using a DirectChannel
and FluxMessageChannel
when using Project Reactor. Since the DirectChannel is stateless and controlled by its pollers, I'd expect the FluxMessageChannel
to not be needed. I'm trying to understand when exactly should I use each and why, when speaking on Reactive Streams applications that are implemented with Spring Integration.
I currently have a reactive project that uses DirectChannel
, and it seems to work fine, even the documentation says:
the flow behavior is changed from an imperative push model to a reactive pull model
I'd like to understand when to use each of the channels and what is the exact difference when working with Reactive Streams.
The DirectChannel
does not have any poller and its implementation is very simple: as long as a message is sent to it, the handler is called. In the same caller's thread:
public class DirectChannel extends AbstractSubscribableChannel {
private final UnicastingDispatcher dispatcher = new UnicastingDispatcher();
private volatile Integer maxSubscribers;
/**
* Create a channel with default {@link RoundRobinLoadBalancingStrategy}.
*/
public DirectChannel() {
this(new RoundRobinLoadBalancingStrategy());
}
Where that UnicastingDispatcher
is:
public final boolean dispatch(final Message<?> message) {
if (this.executor != null) {
Runnable task = createMessageHandlingTask(message);
this.executor.execute(task);
return true;
}
return this.doDispatch(message);
}
(There is no executor
option for the DirectChannel
)
private boolean doDispatch(Message<?> message) {
if (tryOptimizedDispatch(message)) {
return true;
}
...
protected boolean tryOptimizedDispatch(Message<?> message) {
MessageHandler handler = this.theOneHandler;
if (handler != null) {
try {
handler.handleMessage(message);
return true;
}
catch (Exception e) {
throw IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message,
() -> "Dispatcher failed to deliver Message", e);
}
}
return false;
}
That's why I call it " imperative push model". The caller is this case is going to wait until the handler finishes its job. And if you have a big flow, everything is going to be stopped in the sender thread until a sent message has reached the end of the flow of direct channels. In two simple words: the publisher is in charge for the whole execution and it is blocked in this case. You haven't faced any problems with your solution based on the DirectChannel
just because you didn't use reactive non-blocking threads yet like Netty in WebFlux or MongoDB reactive driver.
The FluxMessageChannel
was really designed for Reactive Streams purposes where the subscriber is in charge for handling a message which it pulls from the Flux
on demand. This way just after sending the publisher is free to do anything else. Just because it is already a subscriber responsibility to handle the message.
I would say it is definitely OK to use DirectChannel
as long as your handlers are not blocking. As long as they are blocking you should go with FluxMessageChannel
. Although don't forget that there are other channel types for different tasks: https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#channel-implementations