I'm trying to use Spring Cloud Stream to process messages sent to an Azure Event Hub instance. Those messages should be routed to a tenant-specific topic determined at runtime, based on message content, on a Kafka cluster. For development purposes, I'm running Kafka locally via Docker. I've done some research about bindings not known at configuration time and have found that dynamic destination resolution might be exactly what I need for this scenario.
However, the only way to get my solution working is to use StreamBridge
. I would rather use the dynamic destination header spring.cloud.stream.sendto.destination
, in that way the processor could be written as a Function<>
instead of a Consumer<>
(it is not properly a sink). The main concern about this approach is that, since the final solution will be deployed with Spring Data Flow, I'm afraid I will have troubles configuring the streams if using StreamBridge.
Moving on to the code, this is the processor function, I stripped away the unrelated parts
private static final String OUTPUT_DESTINATION_TEMPLATE = "%s.gateway-report";
private static final String STREAM_DESTINATION_HEADER = "spring.cloud.stream.sendto.destination";
private static final String TENANT_ID_HEADER = "tenant-id";
@Bean
public Function<Message<String>, Message<String>>
routeMessageToTenantDestination(TenantGatewayDeviceService gatewayDeviceService) {
return msg -> {
final String tenantId = "test";
final String destination = String.format(OUTPUT_DESTINATION_TEMPLATE, tenantId);
return MessageBuilder.withPayload(msg.getPayload())
.setHeader(STREAM_DESTINATION_HEADER, destination)
.setHeader(TENANT_ID_HEADER, tenantId)
.build();
};
}
and this is my application.yml
spring:
cloud:
stream:
bindings:
routeMessageToTenantDestination-in-0:
binder: kafka-evthub
destination: gateway-report
group: report-processor
dynamic-destinations:
binders:
kafka-ioc:
type: kafka
environment:
spring.cloud.stream.kafka.binder:
brokers: localhost:29092
kafka-evthub:
type: kafka
environment:
spring.cloud.stream.kafka.binder:
brokers: xxxxxxxxxxx.servicebus.windows.net:9093
configuration:
sasl:
jaas:
config: org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://xxxxxxxxxxx.servicebus.windows.net/;SharedAccessKeyName=*******;SharedAccessKey=********";
mechanism: PLAIN
security.protocol: SASL_SSL
default-binder: kafka-ioc
My relevant dependencies in pom.xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
This is the exception I get each time the function fires
2022-01-20 10:56:18.848 ERROR 2258917 --- [container-0-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [... stripped away ...]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:385)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:79)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:442)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:416)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:125)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:119)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:42)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2588)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2569)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2483)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2405)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2284)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1958)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1353)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1344)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1236)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NullPointerException
at org.springframework.cloud.stream.function.StreamBridge.resolveDestination(StreamBridge.java:276)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.doSendMessage(FunctionConfiguration.java:604)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:597)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
... 32 more
I've tried different things, f.i. manually creating the destination topic, configuring an explicit destination binding with the same name assigned to the header (not a definitive solution, just for testing), but I keep getting this exception. I've also tried to provide a NewDestinationBindingCallback<>
and I can see from printing a log that the framework enters the method, but nevertheless I keep getting the same error.
This happens also with the other approach for integrating Spring Cloud Stream with Event Hubs, namely the library azure-spring-cloud-stream-binder-eventhubs
.
As I said previously, I've found a workaround in relying to StreamBridge, but this solution seems less desirable to me and I would like to understand what I'm missing.
EDIT: I made a small step forward and managed to make it work by downgrading spring boot starter version from 2.6.2
to 2.4.4
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
and setting
<properties>
<spring-cloud.version>2020.0.2</spring-cloud.version>
</properties>
instead of 2021.0.0
in pom.xml, as found in the sample provided by sobychacko. However, it seems like a regression, or something is missing in my configuration to make this work with the most recent version?
Not sure what exactly is causing the issues you have. I just created a basic sample app demonstrating the sendto.destination
header and verified that the app works as expected. It is a multi-binder application with two Kafka clusters connected. The function will consume from the first cluster and then using the sendto
header, produce the output to the second cluster. Compare the code/config in this sample with your app and see what is missing.
I see references to StreamBridge
in the stacktrace you shared. However, when using the sendto.destination
header, it shouldn't go through StreamBridge
.