Search code examples
javaavroflume-ng

Flume: Avro event deserializer To Elastic Search


I want to take a record created by the AVRO deserializer and send it to ElasticSearch. I realize I have to write custom code to do this.

Using the LITERAL option, I have the JSON schema that is the first step in using the GenericRecord. However, looking throughout the AVRO Java API, I see no way of using GenericRecord for one record. All examples use DataFileReader.

In short, I can't get the fields from the Flume event.

Has anyone done this before? TIA.


Solution

  • I was able to figure it out. I did the following:

    // Get the schema
    String strSchema = event.getHeader("flume.avro.schema.literal");
    // Get the body
    byte[] body = event.getBody();
    
    // Create the avro schema
    Schema schema = Schema.Parser.parse(strSchema);
    
    // Get the decoder to use to get the "record" from the event stream in object form
    BinaryDecoder decoder = DecoderFactory.binaryDecoder(body, null); 
    
    // Get the datum reader
    GenericDatumReader reader = new GenericDatumReader(schema);
    
    // Get the Avro record in object form
    GenericRecord record = reader.read(null, decoder);
    
    // Now you can iterate over the fields
    for (Schema.Field field : schema.getFields()) {
       Object value = record.get(field.name());
    
       // Code to add field to JSON to send to ElasticSearch not listed
       // ...
    
    } // for (Schema.Field field : schema.getFields()) {
    

    This works well.