I have 2 schemas:
Event.avsc:
{
"type": "record",
"namespace": "com.onemount.jobs.transform.schema.avro",
"name": "Event",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "mtp_interest_submit",
"type": ["null", "InterestSubmitParam"],
"default": null
}
]
}
InterestSubmitParam.avsc:
{
"type": "record",
"namespace": "com.onemount.jobs.transform.schema.avro",
"name": "InterestSubmitParam",
"fields": [
{
"name": "interest",
"type": {
"type": "array",
"items": "string"
}
}
]
}
I'm consuming Avro messages from Kafka Confluent (with specific.avro.reader=false
) and need to convert from GenericRecord
to ObjectNode
. This is the result:
{
"id": "c8b76e58-9803-4c78-9f82-a185bda1cabf",
"mtp_interest_submit": {
"com.onemount.jobs.transform.schema.avro.InterestSubmitParam": {
"interest": [
"fashion",
"travel"
]
}
}
}
But I'm expected it should be:
{
"id": "c8b76e58-9803-4c78-9f82-a185bda1cabf",
"mtp_interest_submit": {
"interest": [
"fashion",
"travel"
]
}
}
How can I fix it. This is my converter code:
GenericRecord genericRecord = ...
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(genericRecord.getSchema());
JsonEncoder encoder = EncoderFactory.get().jsonEncoder(genericRecord.getSchema(), outputStream);
writer.write(genericRecord, encoder);
encoder.flush();
return new String(outputStream.toByteArray(), StandardCharsets.UTF_8);
}
Thanks a lot!
By using jackson-dataformat-avro
, the problem has been resolved:
ObjectMapper mapper = new ObjectMapper(new AvroFactory());
GenericRecord genericRecord = ...;
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(genericRecord.getSchema());
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);
writer.write(genericRecord, encoder);
encoder.flush();
byte[] bytes = outputStream.toByteArray();
return mapper.readerFor(ObjectNode.class)
.with(new AvroSchema(genericRecord.getSchema()))
.readValue(bytes);
}
pom.xml:
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-avro</artifactId>
<version>2.12.3</version>
</dependency>