I'm writing a Kafka stream app in Java that takes input topics created by a connector that uses the schema registry and avro for both the key and value converter. The connector produces the following schemas:
key-schema: "int"
value-schema:{
"type": "record",
"name": "User",
"fields": [
{"name": "firstname", "type": "string"},
{"name": "lastname", "type": "string"}
]}
Actually, there are several topics, the key-schema is always "int" and the value-schema is always a record of some kind (User, Product, etc). My code contains the following definitions
Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl);
Serde<User> userSerde = new SpecificAvroSerde<>();
userSerde.configure(serdeConfig, false);
At first I tried consuming the topic with something like
Consumed.with(Serdes.Integer(), userSerde);
but that did not work because Serdes.Integer() expects integers to be encoded using 4 bytes but avro uses a variable length encoding. Using Consumed.with(Serdes.Bytes(), userSerde);
worked but I really wanted int and not bytes so I changed my code to this
KafkaAvroDeserializer keyDeserializer = new KafkaAvroDeserializer()
KafkaAvroSerializer keySerializer = new KafkaAvroSerializer();
keyDeserializer.configure(serdeConfig, true);
keySerializer.configure(serdeConfig, true);
Serde<Integer> keySerde = (Serde<Integer>)(Serde)Serdes.serdeFrom(keySerializer, keyDeserializer);
This made the compiler produce a warning (it doesn't like the (Serde<Integer>)(Serde)
casting) but it allows me to use
Consumed.with(keySerde, userSerde);
and get an integer as the key. This works just fine and my app is behaving as expected (great!!!). But now I want to define default serde for the key/value and I cannot get it to work.
Setting the default value serde is simple:
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
However I cannot figure out how to define the default key serde.
I tried
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerde.getClass().getName());
Produces runtime error: Could not find a public no-argument constructor for org.apache.kafka.common.serialization.Serdes$WrapperSerdestreamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
Produces runtime error: java.lang.Integer cannot be cast to org.apache.avro.specific.SpecificRecordWhat am I missing? Thanks.
Update (version 5.5 and newer)
Confluent version 5.5
adds native support for primitive Avro types via PrimitiveAvroSerde
(cf. https://github.com/confluentinc/schema-registry/blob/5.5.x/avro-serde/src/main/java/io/confluent/kafka/streams/serdes/avro/PrimitiveAvroSerde.java)
Original answer (version 5.4 and older):
It's a known issues. Primitive Avro types don't work well with Confluent's AvroSerdes, because the Serdes works with GenericAvroRecord
and SpecificAvroRecord
only.
Thus, building you own Serde based on KafkaAvroSerializer
and KafkaAvroDeserializer
is the right approach. To be able to pass this into the config as default Serde, you cannot use Serdes.serdeFrom
because the type information is lost due to genrics type erasure.
However, you can implement you own class that extends Serde
interface instead and pass your custom class into the config:
public class MySerde extends Serde<Integer> {
// use KafkaAvroSerializer and KafkaAvroDeserializer and cast `Object` to `Integer`
}
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, MySerde.class);