Search code examples
javaapache-kafkaproject-reactorconfluent-schema-registryreactor-kafka

Unable to use schemaRegistry in reactor kafka


I am tying to setup kafka consumer using reactor kafka . Producer is integrated with kafka schema registry



    @Value("${spring.kafka.schemaRegistryUrls}")
    private String schemaRegistryEnvVarValue;


 @Bean
    public ReceiverOptions<String, MyProto> kafkaReceiverOptionsFloor(
            KafkaProperties kafkaProperties) {

        final Map<String, Object> kafkaConsumerProperties =
                kafkaProperties.buildConsumerProperties();
        for (Map.Entry<String, KafkaProperties.Consumer> entry :
                kafkaConsumerPropertiesConfig.getConsumer().entrySet()) {
            if (kafkaTopics.contains(entry.getKey())) {
                kafkaConsumerProperties.putAll(entry.getValue().buildProperties());
            }
        }
        kafkaConsumerProperties.put("schema.registry.url", schemaRegistryEnvVarValue);
        final ReceiverOptions<String, MyProto> basicReceiverOptions =
                ReceiverOptions.<String, MyProto>create(
                                kafkaConsumerProperties)
                        .withValueDeserializer(new MyProtoDeserializer())
                        // disabling auto commit, since we are managing committing once
                        // record is
                        // processed
                        .commitInterval(Duration.ZERO)
                        .commitBatchSize(0);

        kafkaConsumerProperties.forEach((k, v) -> log.debug("k2 {} v2 {}", k, v));

        return basicReceiverOptions
                .subscription(kafkaTopicsFloor)
                .addAssignListener(partitions -> log.debug("onPartitionsAssigned {}", partitions))
                .addRevokeListener(partitions -> log.debug("onPartitionsRevoked {}", partitions));
    }


  @Bean
    public ReactiveKafkaConsumerTemplate<String, MyProto>
            reactiveKafkaConsumerTemplate(
                    ReceiverOptions<String, MyProto>
                            kafkaReceiverOptions) {
        return new ReactiveKafkaConsumerTemplate<>(kafkaReceiverOptions);
    }

I am getting exception as Protocol message contained an invalid tag (zero). Its able to parse in my Unit tests (without schema registry)

Looks like schemaregistry is not being used . what am i doing wrong here .

Deserializer looks like below

@Slf4j
public class MyProtoDeserializer implements Deserializer<MyProto> {
    public MyProtoDeserializer() {}

    /**
     * Deserializes the data to my_proto from byte array.
     *
     * @param topic
     * @param data
     * @return
     */
    @Override
    public MyProto deserialize(final String topic, final byte[] data) {
        if (data == null) {
            return null;
        }
        // TODO: Use schemaregistry and kpow
      
        try {
            return MyProto.getDefaultInstance()
                    .getParserForType()
                    .parseFrom(data);
        } catch (Exception ex) {
            log.debug("Exception in MyProto parse {}", ex.getMessage());
            return MyProto.getDefaultInstance();
        }
    }
}

Solution

  • Reactor isn't the issue.

    schema.registry.url is only a property of the Confluent Deserializer class. You are not implementing configure function in the Deserializer, therefore you are ignoring that property. Similarly, directly calling parseFrom isn't using any HTTP client to interact with a Registry.

    Import the library, rather than write your own

    https://mvnrepository.com/artifact/io.confluent/kafka-protobuf-serializer/7.4.0

    Also, this is how to auto configure Spring Boot with that property

    spring:
      kafka:
        consumer:
          value-deserializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
          properties:
            "[schema.registry.url]": http://...
    

    ref https://docs.spring.io/spring-boot/docs/current/reference/html/messaging.html#messaging.kafka.additional-properties