I am trying to migrate to the new functional programming model for Spring Cloud Stream, replacing conditional StreamListener annotations like this
@StreamListener("app-input", condition = "headers['eventName']=='Funded'")
with something like
@Bean
fun router() = MessageRoutingCallback {
when (it.headers["eventName"]) {
"Funded" -> "funded"
else -> "ignored"
}
}
@Bean
fun funded() = Consumer { message: Message<Funded> ->
...
}
@Bean
fun ignored() = Consumer { message: Message<*> ->
...
}
with associated properties linking the channel to a topic
spring.cloud.function.definition=functionRouter
spring.cloud.stream.bindings.functionRouter-in-0.destination=MyTopic
I need this level of indirection because there are multiple Avro Message types all arriving on MyTopic which need to be deserialised and routed differently
This all works quite happily, and I can consume and route messages as expected. However there is an unexpected and unwanted side effect of using functionRouter in this way, which is that it attempts to bind functionRouter-out-0 to Kafka as well, when there is no topic available and so every 30 seconds the application attempts to attach to a topic on the broker called "functionRouter-out-0" and fails with an authorization error, as you would expect.
2021-05-06 12:57:55.654 WARN [screening] o.s.c.s.b.k.p.KafkaTopicProvisioner : No partitions have been retrieved for the topic (functionRouter-out-0). This will affect the health check.
2021-05-06 12:57:56.198 WARN [screening] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-3] Error while fetching metadata with correlation id 4 : {functionRouter-out-0=TOPIC_AUTHORIZATION_FAILED}
2021-05-06 12:57:56.199 ERROR [screening] org.apache.kafka.clients.Metadata : [Producer clientId=producer-3] Topic authorization failed for topics [functionRouter-out-0]
2021-05-06 12:57:56.199 ERROR [screening] o.s.c.s.b.k.p.KafkaTopicProvisioner : Failed to obtain partition information
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [functionRouter-out-0]
So the question is either a) how can I stop the functionRouter-out-0 channel attempting to bind to Kafka, or b) how else can I achieve this without needing the intermediary channel?
Spring Cloud Stream event routing functionality autocreates new topic is similar, but didn't ever receive an answer.
I believe it's a bug. I opened an issue if you want to follow it:
https://github.com/spring-cloud/spring-cloud-stream/issues/2168
As a work around simply point it to the same destination
spring.cloud.function.definition=functionRouter
spring.cloud.stream.bindings.functionRouter-in-0.destination=so67419839
spring.cloud.stream.bindings.functionRouter-out-0.destination=so67419839
spring.cloud.stream.bindings.functionRouter-in-0.group=so67419839
Since the delegates are all Consumer
we'll never actually send anything.