Search code examples
spring-cloud-streamspring-cloud-stream-binder-kafka

How to enable function routing with two different input topics


I am in the process of migrating an application to the new function based programming model of Spring Cloud Stream but block on event routing.

I have to route events coming from two different kafka topics, and I don't see how I can bind functionRouter-in-0 to two different destinations.

Routing would be done by adding a spring.cloud.function.definition header to each message on the producer side.

Let's say I have

  • 4 consumer functions in my service : consumerA, consumerB, consumerC, consumerD
  • 2 kafka topics :
    • topic1 that has messages with the spring.cloud.function.definition header having value consumerA or consumerB
    • topic2 that has message with the header having value consumerC or consumerD

How could I express in the configuration that the RoutingFunction.FUNCTION_NAME should listen to topic1 AND topic2 ?

spring:
  cloud:
    function.definition: consumerA;consumerB;consumerC;consumerD
    stream:
      function.routing.enabled: true
      bindings:
        functionRouter-in-0:
          destination: topic1
          group: myService
          consumer:
            concurrency: 1
            partitioned: false
            maxAttempts: 1
        functionRouter-in-0:   # <== this oviously does not work because it's already defined above
          destination: topic2
          group: myService
          consumer:
            concurrency: 3
            maxAttempts: 1

Note : I'm on version 3.0.11.RELEASE


Solution

  • With regard to bindings nothing has changed from the annotation based model and as it is explained here - "If binding represents a consumer binding (input), it could be bound to multiple destinations, and the destination names can be specified as comma-separated String values." So. . .

    . . .
    functionRouter-in-0:
              destination: topic1, topic2
              group: myService
    . . .
    

    The rest i believe is explained here, which discusses ROUTING TO and FROM as well as different mechanisms you can use - such as application property, message headers etc.

    Also, for the outbound there are also several option you have. One is StreamBridge and ...sendto... header.

    Feel free to follow up.