Search code examples
spring-bootapache-kafkaspring-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

Spring Cloud Stream Multi Cluster and Multi Input Bindings from a Single Consumer


In my project, I need to connect to two different Kafka brokers and consume all events of two topics, one topic on each Kafka broker.

My application.yaml looks somewhat like this:

spring:
    cloud:
    function:
      definition: orderCreatedListener;orderProcessedListener
    stream:
      bindings:
        orderCreatedProducer-out-0:
          destination: order-created
          binder: kafka-one
        orderCreatedListener-in-0:
          destination: order-created
          group: spot
          binder: kafka-one
        orderCreatedListener-out-0:
          destination: order-processed
          binder: kafka-two # I changed this binder between kafka-one and kafka-two manually for tests, the orderProcessedListener-in-1 binding doesn't have the exclusive producer
        orderProcessedListener-in-0: # CONSUME FROM KAFKA ONE
          destination: order-processed
          group: spot
          binder: kafka-one
        orderProcessedListener-in-1: # CONSUMER FROM KAFKA TWO
          destination: order-processed
          group: spot
          binder: kafka-two
      kafka:
        binder:
          auto-create-topics: true
          configuration:
            security:
              protocol: SASL_PLAINTEXT
            sasl:
              mechanism: PLAIN
        bindings:
          orderCreatedListener-in-0:
            consumer:
              enableDlq: true
              dlqName: order-created-dlq
              autoCommitOnError: true
              autoCommitOffset: true
          orderProcessedListener-in-0:
            consumer:
              enableDlq: true
              dlqName: order-processed-dlq
              autoCommitOnError: true
              autoCommitOffset: true
          orderProcessedListener-in-1:
            consumer:
              enableDlq: true
              dlqName: order-processed-dlq
              autoCommitOnError: true
              autoCommitOffset: true
      binders:
        kafka-one:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: localhost:9092
                      configuration:
                        sasl:
                          jaas:
                            config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"spot\" password=\"spot\";"
        kafka-two:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: localhost:9093
                      configuration:
                        sasl:
                          jaas:
                            config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"spot\" password=\"spot\";"

But it didn't work when I ran the application, the consumer binding from Kafka One works normally orderProcessedListener-in-0, but the other consumer binding from Kafka Two doesn't work orderProcessedListener-in-1.

I am using:

  • Spring Boot 3.3.0
  • Spring Cloud 2023.0.1

I have the two Kafka clusters running fine on my development environment with docker containers, one exposed on the 9092 port and the other exposed on the 9093 port.

Architecture example: project architecture example

Kafka one with all signed consumers on all topics: enter image description here

Kafka two without never signed consumers on all topics: enter image description here

How do we adjust this?


Solution

  • The simple approach to solve this issue is to use two beans pointing to a unique entry point method.

    Example:

    
    @Component
    public class OrderProcessedListener {
    
        public void consume(final Message<Order> message) {
            // your business logic to process message
        }
    
        @Bean
        public Consumer<Message<Order>> orderProcessedFromKafkaOneListener() {
            return this::consume;
        }
    
        @Bean
        public Consumer<Message<Order>> orderProcessedFromKafkaTwoListener() {
            return this::consume;
        }
    
    }
    

    The properties configuration should be:

    spring:
        cloud:
        function:
          definition: orderCreatedListener;orderProcessedFromKafkaOneListener;orderProcessedFromKafkaTwoListener
        stream:
          bindings:
            orderCreatedProducer-out-0:
              destination: order-created
              binder: kafka-one
            orderCreatedListener-in-0:
              destination: order-created
              group: spot
              binder: kafka-one
            orderCreatedListener-out-0:
              destination: order-processed
              binder: kafka-two
            orderProcessedFromKafkaOneListener-in-0:
              destination: order-processed
              group: spot
              binder: kafka-one
            orderProcessedFromKafkaTwoListener-in-0:
              destination: order-processed
              group: spot
              binder: kafka-two
          kafka:
            binder:
              auto-create-topics: true
              configuration:
                security:
                  protocol: SASL_PLAINTEXT
                sasl:
                  mechanism: PLAIN
            bindings:
              orderCreatedListener-in-0:
                consumer:
                  enableDlq: true
                  dlqName: order-created-dlq
                  autoCommitOnError: true
                  autoCommitOffset: true
              orderProcessedFromKafkaOneListener-in-0:
                consumer:
                  enableDlq: true
                  dlqName: order-processed-dlq
                  autoCommitOnError: true
                  autoCommitOffset: true
              orderProcessedFromKafkaTwoListener-in-0:
                consumer:
                  enableDlq: true
                  dlqName: order-processed-dlq
                  autoCommitOnError: true
                  autoCommitOffset: true
          binders:
            kafka-one:
              type: kafka
              environment:
                spring:
                  cloud:
                    stream:
                      kafka:
                        binder:
                          brokers: localhost:9092
                          configuration:
                            sasl:
                              jaas:
                                config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"spot\" password=\"spot\";"
            kafka-two:
              type: kafka
              environment:
                spring:
                  cloud:
                    stream:
                      kafka:
                        binder:
                          brokers: localhost:9093
                          configuration:
                            sasl:
                              jaas:
                                config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"spot\" password=\"spot\";"
    

    Anytime the best solution for this is to use the reactive functions.