Search code examples
spring-integrationspring-integration-dsl

Spring Integration channel() distributes messages to unexpected subscribers


currently I facing some issue with Spring IntegrationFlows and the channel() method. I'm not sure what I have miss understood something or if there is a bug in the framework.

Here is the Flow:

@Configuration
@RequiredArgsConstructor
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
public class ImportFlow {


    ConsumerFactory<String, String> kafkaConsumerFactory;
    ServiceProperties serviceProperties;

    DebugHandler debugHandler;

    DebugSecondHandler secondHandler;

    TombstoneHandler tombstoneHandler;

    @Bean
    public StandardIntegrationFlow startKafkaInbound() {
        return IntegrationFlow.from(Kafka
                .messageDrivenChannelAdapter(
                    kafkaConsumerFactory,
                    ListenerMode.record,
                    serviceProperties.getImportTopic().getName())
            )
            .channel(Objects.requireNonNull(routeKafkaMessage().getInputChannel()))
            .get();
    }

    @Bean
    public IntegrationFlow routeKafkaMessage() {
        return IntegrationFlow.from("routeKafkaMessage.input")
            .route(Message.class,
                p -> {
                    if (p.getPayload().equals(KafkaNull.INSTANCE)) {
                        return processTombstoneFlow().getInputChannel();
                    }
                    return someSplittingFlow().getInputChannel();
                })
            .get();
    }

    @Bean
    IntegrationFlow someSplittingFlow() {
        return IntegrationFlow.from("someSplittingFlow.input")
            .handle(debugHandler)
            .channel(sendToFlow().getInputChannel())
            .get();
    }


    @Bean
    IntegrationFlow processTombstoneFlow() {
        return IntegrationFlow.from("processTombstoneFlow.input")
            .handle(tombstoneHandler)
            .channel(sendToFlow().getInputChannel())
        .handle(unexpectedHandler) // Whatever you do here besides a direct get() will lead to an error 
            // .nullChannel()
            .handle(m -> {})
            .get();
    }


    @Bean
    IntegrationFlow sendToFlow() {
        return IntegrationFlow.from("sendToFlow.input")
            .handle(secondHandler)
            .handle(m -> {}, e -> e.id("endOfSendToFlow"))
            .get();
    }

}

If I send 100 messages with Payload to kafka only 50 messages are received by the secondHandler in the sendToFlow. The other 50 messages are received by the unexpectedHandler Somehow I expected, that all messages would be send to the sendToFlow.

From the logs it can be seen that each second massage was send to the tombstoneFlow:

org.springframework.integration.channel.DirectChannel: preSend on channel 'bean 'processTombstoneFlow.channel#0'

It worked also vice versa. If I send 100 Messages with null (KafkaNull) only the half reaching the sencondHandler.

I was expecting that there are two DirectChannel only connecting someSplittingFlow<->sendToFlow or processTombstoneFlow<->sendToFlow

What I do not understand is how and why someSplittingFlow and processTombstoneFlow get connected.

I also added once a third channel to send something to the sendToFlow. Then only 33% of the messages reached the expected channel.

I put the demo app here


Solution

  • The default MessageChannel type in Spring Integration is a DirectChannel. That is, for example, a result of your IntegrationFlow.from("sendToFlow.input"). This channel comes with a round-robin strategy which means that subscribed message handlers are going to get every message for only one next subscriber.

    In your case that sendToFlow.input got one subscriber from that sendToFlow(), essentially yours .handle(secondHandler). And then you do this in the processTombstoneFlow():

        .channel(sendToFlow().getInputChannel())
        .handle(unexpectedHandler)
    

    Which means that you subscribe this unexpectedHandler as a second one for the same channel. Therefore your round-robin behavior.

    We need to know more about your logic to determine some steps how to fix the situation.

    See more info in docs about that channel: https://docs.spring.io/spring-integration/reference/channel/implementations.html#channel-implementations-directchannel