I want to take a record created by the AVRO deserializer and send it to ElasticSearch. I realize I have to write custom code to do this.
Using the LITERAL option, I have the JSON schema that is the first step in using the GenericRecord. However, looking throughout the AVRO Java API, I see no way of using GenericRecord for one record. All examples use DataFileReader.
In short, I can't get the fields from the Flume event.
Has anyone done this before? TIA.
I was able to figure it out. I did the following:
// Get the schema
String strSchema = event.getHeader("flume.avro.schema.literal");
// Get the body
byte[] body = event.getBody();
// Create the avro schema
Schema schema = Schema.Parser.parse(strSchema);
// Get the decoder to use to get the "record" from the event stream in object form
BinaryDecoder decoder = DecoderFactory.binaryDecoder(body, null);
// Get the datum reader
GenericDatumReader reader = new GenericDatumReader(schema);
// Get the Avro record in object form
GenericRecord record = reader.read(null, decoder);
// Now you can iterate over the fields
for (Schema.Field field : schema.getFields()) {
Object value = record.get(field.name());
// Code to add field to JSON to send to ElasticSearch not listed
// ...
} // for (Schema.Field field : schema.getFields()) {
This works well.