I work with Micronauts, Kafka and Avro.
My Producer can send messages to Kafka without any problems. Unfortunately, my Consumer cannot read them correctly. Error is: io.micronaut.core.bind.exceptions.UnsatisfiedArgumentException: Required argument [EasySchema message] not specified
I use the following AVRO schema from which I generate a class:
{
"type" : "record",
"name" : "EasySchema",
"namespace" : "de.mydata.kafka.easy",
"fields" : [ {
"name" : "active",
"type" : "boolean"
}, {
"name" : "name",
"type" : "string"
}
]
}
I use this to send messages without problems. The message is successfully sent to Kafka, I see in my GUI
public void sendMessages() throws Exception {
EasySchema easy = new EasySchema();
easy.setActive(true);
easy.setName("Maria Klausen");
Random random = new Random();
random.nextInt(1000);
String randNumber = Integer.toString(random.nextInt(100000));
kafkaClientProducer.sendMessage("mykey" + randNumber, easy);
}
@KafkaClient
public interface KafkaClientProducer {
@Topic("with-easy-topic1")
void sendMessage(@KafkaKey String key, EasySchema message);
}
I currently consume the message like this:
@Topic("with-easy-topic1")
void receive(@KafkaKey String key, EasySchema message) {
System.out.println(key);
System.out.println(message);
}
I then receive the following error message:
ERROR i.m.c.k.e.KafkaListenerExceptionHandler - Error processing record [Optional[ConsumerRecord(topic = with-easy-topic1, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1689603813875, serialized key size = 16, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = mykey73556, value = {"active": true, "name": "Maria Klausen"})]] for Kafka consumer [de.mydata.CustomerMessageConsumer@186701a2] produced error: Required argument [EasySchema message] not specified
io.micronaut.core.bind.exceptions.UnsatisfiedArgumentException: Required argument [EasySchema message] not specified
If I would use a simple string instead of EasySchema, I don't get an error. What am I doing wrong? Any ideas?
I would like to use EasySchema directly.
my application.yml looks like this:
micronaut:
application:
name: customerMessageProducer
server:
port: 8092
netty:
default:
allocator:
max-order: 3
kafka:
bootstrap:
servers: localhost:9092
producers:
default:
key:
serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value:
serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema:
registry:
url: http://127.0.0.1:8081
consumers:
default:
key:
deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
value:
deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
schema:
registry:
url: http://127.0.0.1:8081
The first issue seems like a bug, but Micronaut has also documented this way. Or you could try adding @MessageBody
annotation
@Topic("...")
void receive(ConsumerRecord<String, EasySchema> record) {
String key = record.key();
EasySchema value = record.value();
}
Secondly, you need to tell the Confluent deserializer to use a specific record type
kafka:
consumers:
default:
specific.avro.reader: true