Search code examples
spring-cloud-streamspring-cloud-stream-binder-kafka

Spring Cloud Stream functionRouter output attempts to bind to Kafka topic


I am trying to migrate to the new functional programming model for Spring Cloud Stream, replacing conditional StreamListener annotations like this

@StreamListener("app-input", condition = "headers['eventName']=='Funded'")

with something like

@Bean
fun router() = MessageRoutingCallback {
    when (it.headers["eventName"]) {
        "Funded" -> "funded"
        else -> "ignored"
    }
}
@Bean
fun funded() = Consumer { message: Message<Funded> ->
    ...
}

@Bean
fun ignored() = Consumer { message: Message<*> ->
    ...
}

with associated properties linking the channel to a topic

spring.cloud.function.definition=functionRouter
spring.cloud.stream.bindings.functionRouter-in-0.destination=MyTopic

I need this level of indirection because there are multiple Avro Message types all arriving on MyTopic which need to be deserialised and routed differently

This all works quite happily, and I can consume and route messages as expected. However there is an unexpected and unwanted side effect of using functionRouter in this way, which is that it attempts to bind functionRouter-out-0 to Kafka as well, when there is no topic available and so every 30 seconds the application attempts to attach to a topic on the broker called "functionRouter-out-0" and fails with an authorization error, as you would expect.

2021-05-06 12:57:55.654 WARN  [screening]                            o.s.c.s.b.k.p.KafkaTopicProvisioner      : No partitions have been retrieved for the topic (functionRouter-out-0). This will affect the health check.
2021-05-06 12:57:56.198 WARN  [screening]                            org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-3] Error while fetching metadata with correlation id 4 : {functionRouter-out-0=TOPIC_AUTHORIZATION_FAILED}
2021-05-06 12:57:56.199 ERROR [screening]                            org.apache.kafka.clients.Metadata        : [Producer clientId=producer-3] Topic authorization failed for topics [functionRouter-out-0]
2021-05-06 12:57:56.199 ERROR [screening]                            o.s.c.s.b.k.p.KafkaTopicProvisioner      : Failed to obtain partition information
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [functionRouter-out-0]

So the question is either a) how can I stop the functionRouter-out-0 channel attempting to bind to Kafka, or b) how else can I achieve this without needing the intermediary channel?

Spring Cloud Stream event routing functionality autocreates new topic is similar, but didn't ever receive an answer.


Solution

  • I believe it's a bug. I opened an issue if you want to follow it:

    https://github.com/spring-cloud/spring-cloud-stream/issues/2168

    As a work around simply point it to the same destination

    spring.cloud.function.definition=functionRouter
    spring.cloud.stream.bindings.functionRouter-in-0.destination=so67419839
    spring.cloud.stream.bindings.functionRouter-out-0.destination=so67419839
    spring.cloud.stream.bindings.functionRouter-in-0.group=so67419839
    

    Since the delegates are all Consumer we'll never actually send anything.