Search code examples
javaapache-kafkakafka-consumer-apiavroavsc

Java - Dynamically Change Kafka Data Type By Input for SpecificAvroSerde


import KafkaDataType;
...
...
final Serde<KafkaDataType> eventSchema = new SpecificAvroSerde<>();
...
...
StreamsBuilder builder = new StreamsBuilder();
KStream<String, KafkaDataType> eventStream = builder.stream(STREAM_TOPIC);

My KafkaDataType is an avro schema that is automatically generated from the associated .avsc file. My understanding is that KafkaDataType must be pre-defined. However are there existing methods that allow for a dynamic or generic KafkaDataType? If so a sample code block would be much appreciated.

The goal is to have the KafkaDataType be a generic data type such that different Kafka streams with different avro schemas can be swapped in and out and be processed by the Java code. Currently, for each different avro schema, I would need to change KafkaDataType to the specific Java auto generated classes from the .avsc schemas.

Let me know if things need more clarification.


Solution

  • My understanding is that KafkaDataType must be pre-defined.

    For using a SpecificRecord / serde, yes

    However are there existing methods that allow for a dynamic or generic KafkaDataType?

    Avro SpecificRecord classes (such as those generated) extend from GenericRecord, which you can use with GenericSerde instead of your specific type to work with multiple types of records in the same stream