I'm trying to get Spring Cloud Sleuth trace ids to propagate into Spring Cloud Stream Kinesis messages.
Part of my configuration in application.yml file:
spring:
cloud:
stream:
kinesis:
binder:
headers:
- spring.cloud.function.definition
- env
- X-B3-TraceId
This is how I'm publishing the message:
streamBridge.send(SERVICE_STREAM, "kinesis", message);
This is how I'm building the message:
protected <T> Message<T> buildMessagefrom(T entity) {
val clazz = entity.getClass();
return MessageBuilder
.withPayload(entity)
...some other headers
.setHeader("env", "local")
.setHeader("spring.cloud.function.definition", "myConsumer")
.setHeader("X-B3-TraceId", "123")
.build();
}
I wrote a test to check the presence of the headers in the message being published and this is my consumer:
@Bean("myConsumer")
Consumer<Message<AppointmentCreatedEvent>> myConsumer() {
return message -> {
};
}
When I debug my test and check the headers in message, all of the headers I'm setting except X-B3-TraceId are present.
I followed this thread which seems to be using the annotation model: Spring Cloud Sleuth trace ID propagation via Spring Cloud Stream Binder for AWS Kinesis
I wonder if the trace ids progation is supported in the functional model?
Would a Channel Interceptor do the trick?
It can, but I don't know how that is easy going to be for you. We need an EmbeddedHeaderUtils.embedHeaders()
utility and NewDestinationBindingCallback
to intercept those dynamic channels in the StreamBrindge
and be able to modify sent messages via injected ChannelInterceptor
.
I'm looking into a way to embedded this embedded headers logic directly into Kinesis binder and its components. Turns out it is a binder specific feature and no all core Spring Cloud Stream components support this or going at all.
UPDATE
I'm not sure what is going on with your environment, but here is what I have so far with the latest Spring Cloud Stream 4.0.1
and spring-cloud-stream-binder-kinesis-3.0.0
.
My application.properties
spring.cloud.stream.bindings.kinesisConsumer-in-0.destination=my-event
spring.cloud.stream.bindings.kinesisConsumer-in-0.group=my-group
spring.cloud.stream.kinesis.binder.headers=X-B3-TraceId,my-name
Main class:
@SpringBootApplication
public class KinesisBinderObservationDemoApplication {
public static void main(String[] args) {
SpringApplication.run(KinesisBinderObservationDemoApplication.class, args);
}
@Bean
public Consumer<Message<String>> kinesisConsumer(QueueChannel testBuffer) {
return testBuffer::send;
}
@Bean
public QueueChannel testBuffer() {
return new QueueChannel();
}
}
And test class:
@SpringBootTest
@DirtiesContext
class KinesisBinderObservationDemoApplicationTests {
@Autowired
StreamBridge streamBridge;
@Autowired
QueueChannel testBuffer;
@Test
void tracesArePropagateOverKinesis() {
this.streamBridge.send("my-event",
MessageBuilder.withPayload("test data")
.setHeader("my-name", "Test Test")
.setHeader("spring.cloud.function.definition", "myConsumer")
.setHeader("X-B3-TraceId", "123")
.build());
Message<?> receive = this.testBuffer.receive(60_000);
assertThat(receive).isNotNull();
assertThat(receive.getPayload()).isEqualTo("test data");
assertThat(receive.getHeaders())
.containsKeys("my-name", "X-B3-TraceId")
.doesNotContainKey("spring.cloud.function.definition");
}
}
So, I'm taking it back: we don't need anything extra in our configuration to propagate header over Kinesis binder using StreamBridge
. Right, we still need that spring.cloud.stream.kinesis.binder.headers
configuration property, but we always have needed it anyway.