Using the following Spring-Boot properties
spring:
profiles:
active: "ssl"
kafka:
producer:
client-id: ${SPRING_KAFKA_PRODUCER_CLIENT_ID:kafka-producer}
bootstrap-servers: ${SPRING_KAFKA_PRODUCER_BOOTSTRAP_SERVERS:localhost:9092}
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer
the published JSON values starts with null bytes (00 00 00 00 0C 7B
in Hex).
This results in an invalid JSON string (e.g. when displayed in Offset-Explorer).
The @Configuration
class I'm using is as simple as
@Configuration
@EnableAutoConfiguration
@RequiredArgsConstructor
@PropertySources({
@PropertySource("classpath:kafka.yaml"),
@PropertySource("classpath:kafka-ssl.yaml"),
})
public class KafkaProducerConfiguration {
private final KafkaProperties kafkaProperties;
@Bean("producer-factory")
public <T> ProducerFactory<String, T> producerFactory() {
return new DefaultKafkaProducerFactory<>(this.kafkaProperties.getProducer().buildProperties(null));
}
@Bean("kafka-template")
public <T> KafkaTemplate<String, T> kafkaTemplate(ProducerFactory<String, T> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
}
and then I use the private final KafkaTemplate<String, T> kafkaTemplate; to publish a Java object on a topic.
@Service
@RequiredArgsConstructor
public class KafkaProducerServiceImpl<T> implements KafkaProducerService<T> {
private final KafkaTemplate<String, T> kafkaTemplate;
@Override
public void send(String topic, Function<T, String> key, T message) throws KafkaException {
this.kafkaTemplate.send(topic, key.apply(message), message);
}
}
Am I missing something in the configuration properties?
This is expected for that Serializer. Confluent docs
results in an invalid JSON string (e.g. when displayed in Offset-Explorer
Offset Explorer doesn't have JSONSchema support in its Registry integration, last I checked. Spring-Kafka has its own JSONSerializer class that you would want to use instead of anything from Confluent, assuming you want JSON data without using the Schema Registry.
Using the Schema Registry will always send binary encoded data with a starting null byte