I'm having a hard time understanding the usage of Confluent serdes apis. I'm using Confluent platform (7.2.2) schema registry with protobuf and intend to use specific message classes in my Kafka streams application.
Following is an example from here:
private static KafkaProtobufSerde<Message> createConfiguredSerdeForRecordValues() {
SchemaRegistryClient schemaRegistryClient = new SchemaRegistryClient();
KafkaProtobufSerde<Message> serde = new KafkaProtobufSerde<>(schemaRegistryClient);
Map<String, Object> serdeConfig = new HashMap<>();
serdeConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "demo");
serde.configure(serdeConfig, false);
return serde;
}
In version 7.2.2, SchemaRegistryClient
is abstract. In its place, I used
var schemaClient = new CachedSchemaRegistryClient(schemaRegUrl, 100);
var assetKeySerde = new KafkaProtobufSerde<>(schemaClient, AssetKey.class);
var assetConfigSerde = new KafkaProtobufSerde<>(schemaClient, AssetConfig.class);
and then ultimately:
KTable<AssetKey, AssetConfig> assetTable = builder.table(assetTopic, Consumed.with(assetKeySerde, assetConfigSerde));
Here AssetKey
and AssetConfig
are my protobuf generated classes. However, even when passing the schemaClient
and protobuf classes in this constructor, it still expects me to pass schema registry url and protobuf class in a map to .configure()
. Then what's the point of the constructor that I'm using above? With this config, I get an error
com.google.protobuf.DynamicMessage cannot be cast to class AssetKey
If I pass a map in .configure()
with schema.registry.url
and specific.protobuf.key.type
, I get an exception saying invalid schema. The schema is valid though, and my producer can successfully post to the topic.
Is there a complete example that I can refer?
Have you checked out the example from the tutorials https://developer.confluent.io/tutorials/changing-serialization-format/kstreams.html, with source code being at https://github.com/confluentinc/kafka-tutorials/blob/master/_includes/tutorials/serialization/kstreams/code/src/main/java/io/confluent/developer/serialization/SerializationTutorial.java ?
I think the constructor that passes a SchemaRegistryClient
into KafkaProtobufSerde
is for testing purposes only. Setting the properties should be sufficient.