Search code examples
javascalaapache-kafkaavrokafka-producer-api

Kafka Avro Serializer: org.apache.avro.AvroRuntimeException: not open


I am using Apache Kafka with Avro Serializer, using specific format. I am trying to create my own custom class and used as a kafka message value. But when i am trying to send the message i am getting following exception:

Exception in thread "main" org.apache.avro.AvroRuntimeException: not open
    at org.apache.avro.file.DataFileWriter.assertOpen(DataFileWriter.java:82)
    at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:287)
    at com.harmeetsingh13.java.producers.avroserializer.AvroProducer.main(AvroProducer.java:57)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

My Avro Schema file as below:

{
    "namespace": "customer.avro",
    "type": "record",
    "name": "Customer",
    "fields": [{
        "name": "id",
        "type": "int"
    }, {
        "name": "name",
        "type": "string"
    }]
}

Customer Class:

public class Customer {
    public int id;
    public String name;

    public Customer() {
    }

    public Customer(int id, String name) {
        this.id = id;
        this.name = name;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

Data Serialization using Avro:

public static void fireAndForget(ProducerRecord<String, DataFileWriter> record) {
        kafkaProducer.send(record);
    }

Customer customer1 = new Customer(1001, "James");

Parser parser = new Parser();
Schema schema = parser.parse(AvroProducer.class.getClassLoader().getResourceAsStream("customer.avro"));

SpecificDatumWriter<Customer> writer = new SpecificDatumWriter<>(schema);
DataFileWriter<Customer> dataFileWriter = new DataFileWriter<>(writer);
dataFileWriter.append(customer1);
dataFileWriter.close();

ProducerRecord<String, DataFileWriter> record1 = new ProducerRecord<>("CustomerCountry",
        "Customer One", dataFileWriter
);
fireAndForget(record1);

I want to use SpecificDatumWriter writer instead of generic one. What is this error related with ?


Solution

  • Kafka receives a key value pair to serialize, you're passing it a DataFileWriter which isn't the value you want to serialize, that's not going to work.

    What you need to do is create a byte array with the serialized avro via a BinaryEncoder and ByteArrayOutputStream, and then pass it to the ProducerRecord<String, byte[]>:

    SpecificDatumWriter<Customer> writer = new SpecificDatumWriter<>(schema);
    ByteArrayOutputStream os = new ByteArrayOutputStream();
    
    try {
      BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(os, null);
      writer.write(customer1, encoder);
      e.flush();
    
      byte[] avroBytes = os.toByteArray();
      ProducerRecord<String, byte[]> record1 = 
        new ProducerRecord<>("CustomerCountry", "Customer One", avroBytes); 
    
      kafkaProducer.send(record1);
    } finally {
      os.close();
    }