Search code examples
javavalidationapache-kafkaspring-kafka

Kafka message validation in ErrorHandlingDeserializer


I don't understand how it should be configured to validate message in ErrorHandlingDeserializer.

spring:
  json:
    use:
      type:
        headers: false
  kafka:
    consumer:
      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      properties:
        spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
        spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
        spring.deserializer.validator.class: org.springframework.validation.beanvalidation.LocalValidatorFactoryBean

But LocalValidatorFactoryBean doesn't validate message with jakarta.validation.constraints.

How to config it with ErrorHandlingDeserializer?

Or it has only one way to add validator?

@Configuration
@RequiredArgsConstructor
public class KafkaListenerConfig implements KafkaListenerConfigurer {

    private final LocalValidatorFactoryBean validator;

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
        registrar.setValidator(this.validator);
    }
}

Simple Bean validation (adding Validator to registrar) works, but only with simple consuming

@KafkaListener(id = "businessListener", topics = "${app.topic}", batch = "true", idIsGroup = false)
void listen(List<ConsumerRecord<String, @Valid Business>> business) {
    // not validated

}

@KafkaListener(id = "businessListener", topics = "${app.topic}", idIsGroup = false)
void listen(@Valid Business business) {
    // validated
}

Solution

  • Finaly resolved by adding own Validator with property spring.deserializer.validator.class

    public class DeserializerValidator extends SpringValidatorAdapter {

    public DeserializerValidator() {
        super(Validation.buildDefaultValidatorFactory().getValidator());
    }
    

    }