Search code examples
springspring-cloud-streamsolace

Spring cloud stream: Multiple functionRouters


We are using Spring Cloud Stream to listen to multiple queues.

spring:
  cloud:
    function:
      definition: functionRouter;supplier
    stream:
      solace:
        bindings:
          functionRouter-in-0:
            consumer:
              ...
      bindings:
        functionRouter-in-0:
          destination: ${SOLACE_QUEUE},${SOLACE_DMQ}

As we expect multiple message formats on the queues, we use functionRouter and MessageRoutingCallback to find the correct function that handles the message, so we can leverage the automatic deserialization of JSON messages. Simplified example:

class MessageRouter : MessageRoutingCallback {

    override fun functionDefinition(message: Message<*>): String {
        val topic = message.headers[SolaceHeaders.DESTINATION]
        ...
        val isDmqEligible = message.headers[SolaceHeaders.DMQ_ELIGIBLE] as Boolean
        if (!isDmqEligible) {
            return "receiveFromDMQ"
        }
        return "receiveAndSend"
    }

}

As both queues use the same binding, we cannot set different consumer properties like backOffMaxInterval if we use only one functionRouter.

Is there any solution that uses a similar approach (routers that select the handler function based on message header) but support multiple routers as entry points? Something like this:

spring:
  cloud:
    function:
      definition: functionRouter1;functionRouter2;supplier
    stream:
      solace:
        bindings:
          functionRouter1-in-0:
            consumer:
              ...
          functionRouter2-in-0:
            consumer:
              ...
      bindings:
        functionRouter1-in-0:
          destination: ${SOLACE_QUEUE}
        functionRouter2-in-0:
          destination: ${SOLACE_DMQ}

I'd be also open to any solutions (using spring cloud stream) where we can dynamically dispatch different message types from the same channel ${SOLACE_QUEUE} and we can still leverage the automatic deserialization.


Solution

  • So, RoutingFunction was not really designed for this type of cases since, as you said, it's single binding and routing is happening to other functions by reference (no queue in between). That said, you can certainly do what you suggested by simply defining another router function bean with different name in your context:

    @Bean
    RoutingFunction functionRouter2(FunctionCatalog functionCatalog, FunctionProperties functionProperties,
                                    BeanFactory beanFactory, @Nullable MessageRoutingCallback routingCallback) {
        return new RoutingFunction(functionCatalog, functionProperties, new BeanFactoryResolver(beanFactory), routingCallback);
    }