Search code examples
spring-cloud-streamspring-cloud-sleuth

Send Spring Cloud Sleuth trace IDs as headers over Kinesis - functional model


I'm trying to get Spring Cloud Sleuth trace ids to propagate into Spring Cloud Stream Kinesis messages.

Part of my configuration in application.yml file:

spring:
  cloud:
    stream:
      kinesis:
        binder:
          headers:
            - spring.cloud.function.definition
            - env
            - X-B3-TraceId

This is how I'm publishing the message:

streamBridge.send(SERVICE_STREAM, "kinesis", message);

This is how I'm building the message:

protected <T> Message<T> buildMessagefrom(T entity) {
    val clazz = entity.getClass();
    return MessageBuilder
            .withPayload(entity)
            ...some other headers
            .setHeader("env", "local")
            .setHeader("spring.cloud.function.definition", "myConsumer")
            .setHeader("X-B3-TraceId", "123")
            .build();
}

I wrote a test to check the presence of the headers in the message being published and this is my consumer:

@Bean("myConsumer")
Consumer<Message<AppointmentCreatedEvent>> myConsumer() {
    return message -> {

    };
}

When I debug my test and check the headers in message, all of the headers I'm setting except X-B3-TraceId are present.

I followed this thread which seems to be using the annotation model: Spring Cloud Sleuth trace ID propagation via Spring Cloud Stream Binder for AWS Kinesis

I wonder if the trace ids progation is supported in the functional model?


Solution

  • Would a Channel Interceptor do the trick?

    It can, but I don't know how that is easy going to be for you. We need an EmbeddedHeaderUtils.embedHeaders() utility and NewDestinationBindingCallback to intercept those dynamic channels in the StreamBrindge and be able to modify sent messages via injected ChannelInterceptor.

    I'm looking into a way to embedded this embedded headers logic directly into Kinesis binder and its components. Turns out it is a binder specific feature and no all core Spring Cloud Stream components support this or going at all.

    UPDATE

    I'm not sure what is going on with your environment, but here is what I have so far with the latest Spring Cloud Stream 4.0.1 and spring-cloud-stream-binder-kinesis-3.0.0.

    My application.properties

    spring.cloud.stream.bindings.kinesisConsumer-in-0.destination=my-event
    spring.cloud.stream.bindings.kinesisConsumer-in-0.group=my-group
    spring.cloud.stream.kinesis.binder.headers=X-B3-TraceId,my-name
    

    Main class:

    @SpringBootApplication
    public class KinesisBinderObservationDemoApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(KinesisBinderObservationDemoApplication.class, args);
        }
    
        @Bean
        public Consumer<Message<String>> kinesisConsumer(QueueChannel testBuffer) {
            return testBuffer::send;
        }
    
        @Bean
        public QueueChannel testBuffer() {
            return new QueueChannel();
        }
    
    }
    

    And test class:

    @SpringBootTest
    @DirtiesContext
    class KinesisBinderObservationDemoApplicationTests {
    
        @Autowired
        StreamBridge streamBridge;
    
        @Autowired
        QueueChannel testBuffer;
    
        @Test
        void tracesArePropagateOverKinesis() {
            this.streamBridge.send("my-event",
                    MessageBuilder.withPayload("test data")
                            .setHeader("my-name", "Test Test")
                            .setHeader("spring.cloud.function.definition", "myConsumer")
                            .setHeader("X-B3-TraceId", "123")
                            .build());
    
            Message<?> receive = this.testBuffer.receive(60_000);
            assertThat(receive).isNotNull();
            assertThat(receive.getPayload()).isEqualTo("test data");
            assertThat(receive.getHeaders())
                    .containsKeys("my-name", "X-B3-TraceId")
                    .doesNotContainKey("spring.cloud.function.definition");
        }
    
    }
    

    So, I'm taking it back: we don't need anything extra in our configuration to propagate header over Kinesis binder using StreamBridge. Right, we still need that spring.cloud.stream.kinesis.binder.headers configuration property, but we always have needed it anyway.