Search code examples
javaapache-kafkaavro

Convert java POJO to Avro schema object then send to kafka server by KafkaTemplate


In our restAPI we will get a complex JSON payload and map it into a POJO. And based on the avro avsc schema file I use avro-maven-plugin to generate some avro schema class.

My question is when we send message to kafka and schema registry by using KafkaTemplate, we need to send with avro schema object. We can't manually map values from the payload request object into the avro schema object due to the huge number of fields.


Solution

  • Two steps to convert any pojo class to avro genric record

    1. Using jackson/avro, to convert the pojo into bytes with Avro Mapper.

    2. Using Avro GenericDatumReader to read it as Generic Record.

    public class AvroConverter{
    
     public static GenericRecord convertToGenericRecord(String schemaPath, SomeClass someObject){
      Schema schema = new Schema.Parser().setValidate(true).parse(new ClassPathResource(schemaPath).getFile());
      AvroSchema avSchema = new AvroSchema(schema);
      ObjectWritter writter = new AvroMapper().writer(avSchema);
      final byte[] bytes = objectWriter.writeValueAsBytes(someObject);
      GenericDatumReader<Object> genericRecordReader = new GenericDatumReader<>(avSchema);
      return (GenericRecord) genericRecordReader.read(null, DecoderFactory.get().binaryDecoder(bytes, null));
     }
    
    }
    

    Gradle Dependency

        implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-avro'
    

    While doing serialization, you may face issues. For that, you have to configure the avro mapper properties