Search code examples
javaspring-webfluxspring-kafkaproject-reactorreactor-kafka

How to move to the next offset when RecordDeserializationException occurs reactor-kafka receiver?


When using spring-webflux with reactor-kafka receiver how can I manually move/commit offset when RecordDeserializationException occurs? From RecordDeserializationException I can get partition and offset, but I cannot manualy create a ReceiverOffset object which would allow me to commit (as it has private implementation).

   reactiveKafkaReceiver
            .receiveBatch()
            .onErrorResume(e -> {
                RecordDeserializationException rde = (RecordDeserializationException) e;
                TopicPartition topicPartition = rde.topicPartition();
                long offset = rde.offset();
                // how can I commit this offset?
                return Flux.empty();
            })
            .delayUntil(flux -> flux
                    .collectList()
                    .delayUntil(this::process)
                    .doOnNext(records -> records.forEach(record -> record.receiverOffset()
                            .commit()
                            .subscribeOn(Schedulers.boundedElastic())
                            .subscribe())))
            .retryWhen((Retry.backoff(3, Duration.ofSeconds(2)).transientErrors(true)))
            .repeat()
            .subscribe();

Is there any solution here?


Solution

  • Following Gary Russell's answer and its link to https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handling-deserializer here is a solution which transforms deserialization errors into null-value (it also automatically adds error info into kafka headers). In such case we should just add null check condition when we process the records in the main receiver stream.

    @Configuration
    @RequiredArgsConstructor
    public class KafkaConsumerConfig {
        private final KafkaConsumerProperty KafkaConsumerProperty;
        
        @Bean
        public KafkaReceiver<String, KafkaMessageDto> reactiveKafkaReceiver() {
            Map<String, Object> config = new HashMap<>();
            config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConsumerProperty.getBootstrapServers());
            ...
            config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
            config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
            config.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedDeserializationFunction.class);
            config.put(JsonDeserializer.VALUE_DEFAULT_TYPE, KafkaMessageDto.class);
    
            ReceiverOptions<String, KafkaMessageDto> receiverOptions = ReceiverOptions.<String, KafkaMessageDto>create(config)
                  .subscription(Collections.singletonList(kafkaConsumerProperty.getTopicName()));
           return KafkaReceiver.create(receiverOptions);
       }
    }
    
    
    @Slf4j
    public class FailedDeserializationFunction implements Function<FailedDeserializationInfo, Object> {
        @Override
        public Object apply(FailedDeserializationInfo info) {
            log.warn("Fail to deserialize kafka message from topic=" + info.getTopic() + ". Set it to null. ",
                    info.getException());
            return null;
        }
    }