Search code examples
apache-kafkaavroapache-kafka-streamsconfluent-schema-registry

Testing Kafka Processor API that uses SpecificAvroSerde


I'm trying to write unit test for a custom stream processor and got stuck with serializing the message i need to send for the test. I followed this example by kafka: https://kafka.apache.org/11/documentation/streams/developer-guide/testing.html . I use SpecificAvroSerde for a custom class (auto genereted avro class) in my stream, but i cant configure it in the test with the MockSchemaRegistryClient(), I only can point to the URL of SR.

    Serde<MyCustomObject> valueSerde = new SpecificAvroSerde<>();
    Map<String, String> valueSerdeConfig = new HashMap<>();
    valueSerdeConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "fake");
    valueSerdeConfig.put(AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS, "true");
    valueSerde.configure(valueSerdeConfig, false);
    ConsumerRecordFactory<Long, MyCustomObject> recordFactory = new ConsumerRecordFactory<>(new LongSerializer(), valueSerde.serializer());

With KafkaAvroSerializer i can initialize it like this:

KafkaAvroSerializer serializer = new KafkaAvroSerializer(schemaRegistryClient);

but ConsumerRecordFactory won't take KafkaAvroSerializer as an argument.

Is there any alternative to this or a way that I'm not aware of for doing this?

I'll appreciate any help, thank you.


Solution

  • Thanks for the proposed solution, but I did managed to find one today. Serdes consists of a serializer and deserializer: https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/Serdes.html#serdeFrom-org.apache.kafka.common.serialization.Serializer-org.apache.kafka.common.serialization.Deserializer- so I constructed my Serde from KafkaAvroSerializer and KafkaAvroDeserializer like this:

    Serde serde = Serdes.serdeFrom(new KafkaAvroSerializer(client), new KafkaAvroDeserializer(client));
    

    Every class that implements Serializer< T > can be part of a Serde.