Search code examples
javaapache-kafkadeserializationavromicronaut

Micronaut, Kafka with Avro - Consumer Error: Required argument [EasySchema message] not specified


I work with Micronauts, Kafka and Avro.

My Producer can send messages to Kafka without any problems. Unfortunately, my Consumer cannot read them correctly. Error is: io.micronaut.core.bind.exceptions.UnsatisfiedArgumentException: Required argument [EasySchema message] not specified

I use the following AVRO schema from which I generate a class:

{
  "type" : "record",
  "name" : "EasySchema",
  "namespace" : "de.mydata.kafka.easy",
  "fields" : [ {
    "name" : "active",
    "type" : "boolean"
    }, {
      "name" : "name",
      "type" : "string"
    }
   ]
}

I use this to send messages without problems. The message is successfully sent to Kafka, I see in my GUI

    public void sendMessages() throws Exception {
        EasySchema easy = new EasySchema();
        easy.setActive(true);
        easy.setName("Maria Klausen");
        Random random = new Random();
        random.nextInt(1000);
        String randNumber = Integer.toString(random.nextInt(100000));
        kafkaClientProducer.sendMessage("mykey" + randNumber, easy);
    }

    @KafkaClient
    public interface KafkaClientProducer {
        @Topic("with-easy-topic1")
        void sendMessage(@KafkaKey String key, EasySchema message);
    }

I currently consume the message like this:

    @Topic("with-easy-topic1")
    void receive(@KafkaKey String key, EasySchema message) {
        System.out.println(key);
        System.out.println(message);
    }

I then receive the following error message:

ERROR i.m.c.k.e.KafkaListenerExceptionHandler - Error processing record [Optional[ConsumerRecord(topic = with-easy-topic1, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1689603813875, serialized key size = 16, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = mykey73556, value = {"active": true, "name": "Maria Klausen"})]] for Kafka consumer [de.mydata.CustomerMessageConsumer@186701a2] produced error: Required argument [EasySchema message] not specified
io.micronaut.core.bind.exceptions.UnsatisfiedArgumentException: Required argument [EasySchema message] not specified

If I would use a simple string instead of EasySchema, I don't get an error. What am I doing wrong? Any ideas?

I would like to use EasySchema directly.

my application.yml looks like this:

micronaut:
  application:
    name: customerMessageProducer
  server:
    port: 8092
netty:
  default:
    allocator:
      max-order: 3
kafka:
  bootstrap:
    servers: localhost:9092
  producers:
    default:
      key:
        serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      value:
        serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      schema:
        registry:
          url: http://127.0.0.1:8081
  consumers:
    default:
      key:
        deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
      value:
        deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
      schema:
        registry:
          url: http://127.0.0.1:8081

Solution

  • The first issue seems like a bug, but Micronaut has also documented this way. Or you could try adding @MessageBody annotation

    @Topic("...")
    void receive(ConsumerRecord<String, EasySchema> record) {
      String key = record.key();
      EasySchema value = record.value();
    }
    

    Secondly, you need to tell the Confluent deserializer to use a specific record type

    kafka:
        consumers:
            default:
                specific.avro.reader: true