currently I facing some issue with Spring IntegrationFlows and the channel() method. I'm not sure what I have miss understood something or if there is a bug in the framework.
Here is the Flow:
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
public class ImportFlow {
ConsumerFactory<String, String> kafkaConsumerFactory;
ServiceProperties serviceProperties;
DebugHandler debugHandler;
DebugSecondHandler secondHandler;
TombstoneHandler tombstoneHandler;
public StandardIntegrationFlow startKafkaInbound() {
return IntegrationFlow.from(Kafka
public IntegrationFlow routeKafkaMessage() {
return IntegrationFlow.from("routeKafkaMessage.input")
p -> {
if (p.getPayload().equals(KafkaNull.INSTANCE)) {
return processTombstoneFlow().getInputChannel();
return someSplittingFlow().getInputChannel();
IntegrationFlow someSplittingFlow() {
return IntegrationFlow.from("someSplittingFlow.input")
IntegrationFlow processTombstoneFlow() {
return IntegrationFlow.from("processTombstoneFlow.input")
.handle(unexpectedHandler) // Whatever you do here besides a direct get() will lead to an error
// .nullChannel()
.handle(m -> {})
IntegrationFlow sendToFlow() {
return IntegrationFlow.from("sendToFlow.input")
.handle(m -> {}, e ->"endOfSendToFlow"))
If I send 100 messages with Payload to kafka only 50 messages are received by the secondHandler in the sendToFlow. The other 50 messages are received by the unexpectedHandler Somehow I expected, that all messages would be send to the sendToFlow.
From the logs it can be seen that each second massage was send to the tombstoneFlow: preSend on channel 'bean ''
It worked also vice versa. If I send 100 Messages with null (KafkaNull) only the half reaching the sencondHandler.
I was expecting that there are two DirectChannel only connecting someSplittingFlow<->sendToFlow or processTombstoneFlow<->sendToFlow
What I do not understand is how and why someSplittingFlow and processTombstoneFlow get connected.
I also added once a third channel to send something to the sendToFlow. Then only 33% of the messages reached the expected channel.
I put the demo app here
The default MessageChannel
type in Spring Integration is a DirectChannel
. That is, for example, a result of your IntegrationFlow.from("sendToFlow.input")
. This channel comes with a round-robin strategy which means that subscribed message handlers are going to get every message for only one next subscriber.
In your case that sendToFlow.input
got one subscriber from that sendToFlow()
, essentially yours .handle(secondHandler)
. And then you do this in the processTombstoneFlow()
Which means that you subscribe this unexpectedHandler
as a second one for the same channel. Therefore your round-robin behavior.
We need to know more about your logic to determine some steps how to fix the situation.
See more info in docs about that channel: