Search code examples
spring-bootapache-kafkaspring-webfluxproject-reactormicrometer-tracing

Distributed tracing is not working with Spring WebFlux + Reactor Kafka


I use Spring WebFlux, Spring boot version is 3.2.0. I used native Reactor Kafka.

Spring boot Sleuth is moved to Micrometer Tracing

You can read it from here:

Spring Cloud Sleuth’s last minor version is 3.1. You can check the 3.1.x branch for the latest commits. The core of this project got moved to Micrometer Tracing project and the instrumentations will be moved to Micrometer and all respective projects (no longer all instrumentations will be done in a single repository).

I want to implement Distributed tracing. I need traceId and spanId.

I used following dependencies for tracing:

        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core-micrometer</artifactId>
        </dependency>
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-tracing</artifactId>
        </dependency>
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-tracing-bridge-brave</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>

Kafka Configuration

@Configuration
@RequiredArgsConstructor
@Slf4j
public class KafkaConfiguration {
    private final KafkaProperties kafkaProperties;
    private final KafkaTopicProperties topicProperties;

    @Bean
    public <K, V> ReactiveKafkaProducerTemplate<K, V> reactiveKafkaProducerTemplate(ObservationRegistry registry, PropagatingSenderTracingObservationHandler<?> handler) { // Generic Producer, If needed , create custom producer for specific object.
        registry.observationConfig().observationHandler(handler);

        var properties = kafkaProperties.buildProducerProperties(new DefaultSslBundleRegistry());
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(",", kafkaProperties.getBootstrapServers()));

        var senderOptions = SenderOptions.<K, V>create(properties)
                .withObservation(registry, new KafkaSenderObservation.DefaultKafkaSenderObservationConvention());
        return new ReactiveKafkaProducerTemplate<>(senderOptions);
    }

    @Bean(name = "otpSendReactiveKafkaConsumer")
    public ReactiveKafkaConsumerTemplate<String, OtpSendRequestTransferObject> otpSendReactiveKafkaConsumer(ObservationRegistry registry, PropagatingReceiverTracingObservationHandler<?> handler) {
        registry.observationConfig()
                .observationHandler(handler);
        return new ReactiveKafkaConsumerTemplate<>(createReceiverOptions(List.of(topicProperties.getOtp().getTopic()), registry));
    }


    @Bean(name = "smsSendReactiveKafkaConsumer")
    public ReactiveKafkaConsumerTemplate<String, SmsSendRequestTransferObject> smsSendReactiveKafkaConsumer(ObservationRegistry registry, PropagatingReceiverTracingObservationHandler<?> handler) {
        registry.observationConfig()
                .observationHandler(handler);
        return new ReactiveKafkaConsumerTemplate<>(createReceiverOptions(List.of(topicProperties.getSms().getTopic()), registry));
    }

    private <K, V> ReceiverOptions<K, V> createReceiverOptions(List<String> topics, ObservationRegistry registry) {

        var properties = kafkaProperties.buildConsumerProperties(new DefaultSslBundleRegistry());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(",", kafkaProperties.getBootstrapServers()));

        return ReceiverOptions.<K, V>create(properties)
                .withObservation(registry, new KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention())
                .subscription(topics);

    }

}

Kafka Producer

@Service
@Slf4j
@RequiredArgsConstructor
public class SendSMSReactiveKafkaProducer {
    private final KafkaTopicProperties kafkaTopicProperties;
    private final ReactiveKafkaProducerTemplate<String, SmsSendRequestTransferObject> kafkaProducerTemplate;

    public Mono<CommonResult> produce(SmsSendRequest request) {
        var topic = kafkaTopicProperties.getSms().getTopic();

        var kafkaHeaders = new RecordHeaders();
        var message = SmsSendRequestTransferObject.builder()
                .content(request.getContent())
                .receiver(request.getReceiver())
                .build();
        var record = new ProducerRecord<>(topic, null, UUID.randomUUID().toString(), message, kafkaHeaders);

        return kafkaProducerTemplate.send(record)
                .doOnError(ex -> log.error("Failed to send record={} to topic={} failed.", record, topic, ex))
                .doOnSuccess(result -> log.debug("Record is successfully sent to topic={}, metadata={}", topic, result.recordMetadata()))
                .map(result -> {
                    var hasResult = HasResult.SUCCESS;
                    if (result.exception() != null) {
                        hasResult = HasResult.UNKNOWN_ERROR;
                    }
                    return new CommonResult(hasResult);
                });
    }
}

Kafka Consumer


@Service
@Slf4j
public class SendSMSReactiveKafkaConsumer extends AbstractBaseReactiveKafkaConsumer<String, SmsSendRequestTransferObject> {
    private final RetryBackoffSpec retryBackoffSpec;
    private final SmsMessageSender smsMessageSender;
    private final ReactiveKafkaConsumerTemplate<String, SmsSendRequestTransferObject> smsSendReactiveKafkaConsumer;

    public SendSMSReactiveKafkaConsumer(ReactiveKafkaConsumerTemplate<String, SmsSendRequestTransferObject> smsSendReactiveKafkaConsumer,
                                        ReactiveKafkaProducerTemplate<String, ErrorRecord<SmsSendRequestTransferObject>> dlqProducerTemplate,
                                        SmsMessageSender smsMessageSender, KafkaTopicProperties topicProperties,
                                        SmsSettingProperties smsSettingProperties) {
        super(topicProperties.getSms().getDlq(), dlqProducerTemplate);
        var params = smsSettingProperties.getRetryBackOffSpec().getKafka();
        this.smsMessageSender = smsMessageSender;
        this.smsSendReactiveKafkaConsumer = smsSendReactiveKafkaConsumer;
        this.retryBackoffSpec = RetryBackoffSpecUtil.createRetryBackoffSpec(params.getMaxAttempts(), params.getBackOffPeriod());
    }

    @PostConstruct
    public void init() {
        consume()
                .onErrorContinue((throwable, o) -> log.error("Error while initializing Kafka consumer.", throwable))
                .subscribe();
    }

    @Override
    public Flux<Void> consume() {
        return smsSendReactiveKafkaConsumer.receiveAtMostOnce() // Be careful to use receive types. receiveAtMostOnce - receives message and commits immediately , if failure occurs message will not be redelivered, but it guarantees message commit.
                .onErrorResume(Throwable.class, ex -> {
                    log.error("Exception on receiving message from Kafka.", ex);
                    return Mono.empty();  // Continue processing the next message
                })
                .flatMap(message -> {
                            return Mono.just(message)
                                    .doOnNext(this::log)
                                    .flatMap(record -> smsMessageSender.send(record.value(), retryBackoffSpec))
                                    .then()
                                    .onErrorResume(Throwable.class, ex -> {
                                        log.error("Error during message processing.", ex);
                                        return sendToDLQ(message, ex); // Send message to DLQ topic
                                    });
                        }
                )
                .doOnTerminate(() -> log.error("The subscription was terminated. Either it was cancelled or completed successfully."))
                .subscribeOn(Schedulers.boundedElastic());
    }


}

Configured application.yaml file like this:

logging:
  pattern:
    level: "%5p [${spring.application.name:},%X{traceId:-},%X{spanId:-}]"

It is working well, But when I publish request to Kafka topic, I lose current traceId and spanId.

I also configured WebFlux like Hooks.enableAutomaticContextPropagation(); While receiving request from endpoint, tracing is working and traceId and spanId are generating. In different contexts also it keeps current trace.

enter image description here

So problem is that I lost my traceId and spanId after publishing message to Kafka. How to configure Reactive Context Propagation for Kafka.


Solution

  • You have to configure an observation on the SenderOptions explicitly. It is not auto-configured by Spring Boot, since there is just no auto-configuration for Reactor Kafka.

    See more info in the documentation: https://projectreactor.io/docs/kafka/release/reference/

    And search for 5.5. Micrometer Observation.

    And you must use an io.micrometer:micrometer-observation dependency as well.