Search code examples
avroapache-nifi

How can I write FlowFile attributes to Avro metadata inside the FlowFile's content?


I am creating FlowFiles that are manipulated and split downstream after being emitted by an ExecuteSql processor. I have populated the FlowFiles' attributes with data that I want to put into the Avro metadata contained within each FlowFile's content.

How can I do this?

I've tried using an UpdateRecord processor configured with an AvroReader and AvroRecordSetWriter and a property with a key of /canary that should be writing a FlowFile attribute to that key somewhere in the Avro document. It does not appear anywhere in the output, though.

It would be acceptable to move the records in the Avro data to a subkey and have a metadata section be a part of the record data. I would prefer not to do this, though, because it does not seem like the correct solution and because it sounds much more complex than simply modifying the Avro metadata.


Solution

  • The record-aware processors (and the Readers/Writers) are not metadata-aware, meaning they cannot currently (as of NiFi 1.5.0) act on metadata in any way (inspect, create, delete, etc.), so UpdateRecord won't work for metadata per se. With your /canary property key, it will try to insert a field into your Avro record at the top level, named canary, and should have the value you specify. However I believe your output schema needs to have the canary field added at the top level, or it may be ignored (I'm not positive of this, you can check the output schema to see if it is added automatically).

    There is currently no NiFi processor that can update Avro metadata explicitly (MergeContent does some with regards to merging various Avro files together, but you can't choose to set a value, e.g.). However I have an unpolished Groovy script you could use in ExecuteScript to add metadata to Avro files in NiFi 1.5.0+. In ExecuteScript you would set the language to Groovy and the following as the Script Body, then add user-defined (aka "dynamic" properties) to ExecuteScript, where the key will be the metadata key, and the evaluated value (the properties support Expression Language) will be the value:

    @Grab('org.apache.avro:avro:1.8.1')
    import org.apache.avro.*
    import org.apache.avro.file.*
    import org.apache.avro.generic.*
    
    def flowFile = session.get()
    if(!flowFile) return
    
    try {
    // Save off dynamic property values for metadata key/values later
    def metadata = [:]
    context.properties.findAll {e -> e.key.dynamic}.each {k,v -> metadata.put(k.name, context.getProperty(k).evaluateAttributeExpressions(flowFile).value.bytes)}
    
    flowFile = session.write(flowFile, {inStream, outStream ->
       DataFileStream<GenericRecord> reader = new DataFileStream<>(inStream, new GenericDatumReader<GenericRecord>())
       DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>())
       def schema = reader.schema
       def inputCodec = reader.getMetaString(DataFileConstants.CODEC) ?: DataFileConstants.NULL_CODEC
       // Forward the existing metadata to the output
       reader.metaKeys.each { key ->
          if (!DataFileWriter.isReservedMeta(key)) {
             byte[] metadatum = reader.getMeta(key)
             writer.setMeta(key, metadatum)
          }
       }
       // For each dynamic property, set the key/value pair as Avro metadata
       metadata.each {k,v -> writer.setMeta(k,v)}
       writer.setCodec(CodecFactory.fromString(inputCodec))
       writer.create(schema, outStream)
       writer.appendAllFrom(reader, false)
    } as StreamCallback)
    
    session.transfer(flowFile, REL_SUCCESS)
    } catch(e) {
       log.error('Error adding Avro metadata, penalizing flow file and routing to failure', e)
       flowFile = session.penalize(flowFile)
       session.transfer(flowFile, REL_FAILURE)
    } 
    

    Note that this script can work with versions of NiFi previous to 1.5.0, but the @Grab at the top is not supported until 1.5.0, so you'd have to download Avro and its dependencies into a flat folder, and point to that in the Module Directory property of ExecuteScript.