Search code examples
javaapache-kafkaspring-kafkaconfluent-schema-registry

KafkaJsonSchemaSerializer adds initial null bytes (00 00 00 00 0C 7B) to the record value


Using the following Spring-Boot properties

spring:
  profiles:
    active: "ssl"
  kafka:
    producer:
      client-id: ${SPRING_KAFKA_PRODUCER_CLIENT_ID:kafka-producer}
      bootstrap-servers: ${SPRING_KAFKA_PRODUCER_BOOTSTRAP_SERVERS:localhost:9092}
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer

the published JSON values starts with null bytes (00 00 00 00 0C 7B in Hex). This results in an invalid JSON string (e.g. when displayed in Offset-Explorer).

The @Configuration class I'm using is as simple as

@Configuration
@EnableAutoConfiguration
@RequiredArgsConstructor
@PropertySources({
        @PropertySource("classpath:kafka.yaml"),
        @PropertySource("classpath:kafka-ssl.yaml"),
})
public class KafkaProducerConfiguration {
    private final KafkaProperties kafkaProperties;

    @Bean("producer-factory")
    public <T> ProducerFactory<String, T> producerFactory() {
        return new DefaultKafkaProducerFactory<>(this.kafkaProperties.getProducer().buildProperties(null));
    }

    @Bean("kafka-template")
    public <T> KafkaTemplate<String, T> kafkaTemplate(ProducerFactory<String, T> producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }
}

and then I use the private final KafkaTemplate<String, T> kafkaTemplate; to publish a Java object on a topic.

@Service
@RequiredArgsConstructor
public class KafkaProducerServiceImpl<T> implements KafkaProducerService<T> {

    private final KafkaTemplate<String, T> kafkaTemplate;

    @Override
    public void send(String topic, Function<T, String> key, T message) throws KafkaException {
        this.kafkaTemplate.send(topic, key.apply(message), message);
    }
}

Am I missing something in the configuration properties?


Solution

  • This is expected for that Serializer. Confluent docs

    results in an invalid JSON string (e.g. when displayed in Offset-Explorer

    Offset Explorer doesn't have JSONSchema support in its Registry integration, last I checked. Spring-Kafka has its own JSONSerializer class that you would want to use instead of anything from Confluent, assuming you want JSON data without using the Schema Registry.

    Using the Schema Registry will always send binary encoded data with a starting null byte