Search code examples
javaapache-kafkaavro

Kafka: Manually producing record triggers exception in consumer


I have a consumer which polls from multiple topics. Until now I only produced record into these topics with Java and everything worked fine.

I use the confulent tools with avro.

Now I tried to manually produce a topic via the terminal.

I create a avro-producer with the same schema my other producer uses:

# Produce a record with one field
kafka-avro-console-producer \
  --broker-list 127.0.0.1:9092 --topic order_created-in \
  --property schema.registry.url=http://127.0.0.1:8081 \
  --property value.schema='{"type":"record","name":"test","fields":[{"name":"name","type":"string"},{"name":"APropertie","type":{"type":"array","items":{"type":"record","name":"APropertie","fields":[{"name":"key","type":"string"},{"name":"name","type":"string"},{"name":"date","type":"string"}]}}}]}'

After that I produce a record which follows the specified schema:

{"name": "order_created", "APropertie": [{"key": "1", "name": "testname", "date": "testdate"}]}

The record gets correctly produced to the topic. But my AvroConsumer throws an exception:

Polling
Polling
Polling
Polling
Polling
Polling
Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition order_created-in-0 at offset 1. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 61
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class test specified in writer's schema whilst finding reader's schema for a SpecificRecord.

Process finished with exit code 1

Any hints? Thanks!


Solution

  • It has something to do with the config of the producer / consumer.

    Normal producers have a config like this:

            // normal producer
            properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
            properties.setProperty("acks", "all");
            properties.setProperty("retries", "10");
    

    Avro normally adds the following properties:

            // avro part
            properties.setProperty("key.serializer", StringSerializer.class.getName());
            properties.setProperty("value.serializer", KafkaAvroSerializer.class.getName());
            properties.setProperty("schema.registry.url", "http://127.0.0.1:8081");
            properties.setProperty("confluent.value.schema.validation", "true");
            properties.setProperty("confluent.key.schema.validation", "true");
    

    These have to be included in the the console producer.