currently I facing some issue with Spring IntegrationFlows and the channel() method. I'm not sure what I have miss understood something or if there is a bug in the framework.
Here is the Flow:
@Configuration
@RequiredArgsConstructor
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
public class ImportFlow {
ConsumerFactory<String, String> kafkaConsumerFactory;
ServiceProperties serviceProperties;
DebugHandler debugHandler;
DebugSecondHandler secondHandler;
TombstoneHandler tombstoneHandler;
@Bean
public StandardIntegrationFlow startKafkaInbound() {
return IntegrationFlow.from(Kafka
.messageDrivenChannelAdapter(
kafkaConsumerFactory,
ListenerMode.record,
serviceProperties.getImportTopic().getName())
)
.channel(Objects.requireNonNull(routeKafkaMessage().getInputChannel()))
.get();
}
@Bean
public IntegrationFlow routeKafkaMessage() {
return IntegrationFlow.from("routeKafkaMessage.input")
.route(Message.class,
p -> {
if (p.getPayload().equals(KafkaNull.INSTANCE)) {
return processTombstoneFlow().getInputChannel();
}
return someSplittingFlow().getInputChannel();
})
.get();
}
@Bean
IntegrationFlow someSplittingFlow() {
return IntegrationFlow.from("someSplittingFlow.input")
.handle(debugHandler)
.channel(sendToFlow().getInputChannel())
.get();
}
@Bean
IntegrationFlow processTombstoneFlow() {
return IntegrationFlow.from("processTombstoneFlow.input")
.handle(tombstoneHandler)
.channel(sendToFlow().getInputChannel())
.handle(unexpectedHandler) // Whatever you do here besides a direct get() will lead to an error
// .nullChannel()
.handle(m -> {})
.get();
}
@Bean
IntegrationFlow sendToFlow() {
return IntegrationFlow.from("sendToFlow.input")
.handle(secondHandler)
.handle(m -> {}, e -> e.id("endOfSendToFlow"))
.get();
}
}
If I send 100 messages with Payload to kafka only 50 messages are received by the secondHandler in the sendToFlow. The other 50 messages are received by the unexpectedHandler Somehow I expected, that all messages would be send to the sendToFlow.
From the logs it can be seen that each second massage was send to the tombstoneFlow:
org.springframework.integration.channel.DirectChannel: preSend on channel 'bean 'processTombstoneFlow.channel#0'
It worked also vice versa. If I send 100 Messages with null (KafkaNull) only the half reaching the sencondHandler.
I was expecting that there are two DirectChannel only connecting someSplittingFlow<->sendToFlow or processTombstoneFlow<->sendToFlow
What I do not understand is how and why someSplittingFlow and processTombstoneFlow get connected.
I also added once a third channel to send something to the sendToFlow. Then only 33% of the messages reached the expected channel.
I put the demo app here
The default MessageChannel
type in Spring Integration is a DirectChannel
. That is, for example, a result of your IntegrationFlow.from("sendToFlow.input")
. This channel comes with a round-robin strategy which means that subscribed message handlers are going to get every message for only one next subscriber.
In your case that sendToFlow.input
got one subscriber from that sendToFlow()
, essentially yours .handle(secondHandler)
. And then you do this in the processTombstoneFlow()
:
.channel(sendToFlow().getInputChannel())
.handle(unexpectedHandler)
Which means that you subscribe this unexpectedHandler
as a second one for the same channel. Therefore your round-robin behavior.
We need to know more about your logic to determine some steps how to fix the situation.
See more info in docs about that channel: https://docs.spring.io/spring-integration/reference/channel/implementations.html#channel-implementations-directchannel