Search code examples
javaapache-kafkaprotocol-buffersapache-kafka-streamsconfluent-schema-registry

Confluent serdes with protobuf specific messages not working


I'm having a hard time understanding the usage of Confluent serdes apis. I'm using Confluent platform (7.2.2) schema registry with protobuf and intend to use specific message classes in my Kafka streams application.

Following is an example from here:

private static KafkaProtobufSerde<Message> createConfiguredSerdeForRecordValues() {
  SchemaRegistryClient schemaRegistryClient = new SchemaRegistryClient();
  KafkaProtobufSerde<Message> serde = new KafkaProtobufSerde<>(schemaRegistryClient);
  Map<String, Object> serdeConfig = new HashMap<>();
  serdeConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "demo");
  serde.configure(serdeConfig, false);
  return serde;
}

In version 7.2.2, SchemaRegistryClient is abstract. In its place, I used

var schemaClient = new CachedSchemaRegistryClient(schemaRegUrl, 100);
var assetKeySerde = new KafkaProtobufSerde<>(schemaClient, AssetKey.class);
var assetConfigSerde = new KafkaProtobufSerde<>(schemaClient, AssetConfig.class);

and then ultimately:

KTable<AssetKey, AssetConfig> assetTable = builder.table(assetTopic, Consumed.with(assetKeySerde, assetConfigSerde));

Here AssetKey and AssetConfig are my protobuf generated classes. However, even when passing the schemaClient and protobuf classes in this constructor, it still expects me to pass schema registry url and protobuf class in a map to .configure(). Then what's the point of the constructor that I'm using above? With this config, I get an error

com.google.protobuf.DynamicMessage cannot be cast to class AssetKey

If I pass a map in .configure() with schema.registry.url and specific.protobuf.key.type, I get an exception saying invalid schema. The schema is valid though, and my producer can successfully post to the topic.

Is there a complete example that I can refer?


Solution

  • Have you checked out the example from the tutorials https://developer.confluent.io/tutorials/changing-serialization-format/kstreams.html, with source code being at https://github.com/confluentinc/kafka-tutorials/blob/master/_includes/tutorials/serialization/kstreams/code/src/main/java/io/confluent/developer/serialization/SerializationTutorial.java ?

    I think the constructor that passes a SchemaRegistryClient into KafkaProtobufSerde is for testing purposes only. Setting the properties should be sufficient.