I am creating FlowFiles that are manipulated and split downstream after being emitted by an ExecuteSql
processor. I have populated the FlowFiles' attributes with data that I want to put into the Avro metadata contained within each FlowFile's content.
How can I do this?
I've tried using an UpdateRecord
processor configured with an AvroReader
and AvroRecordSetWriter
and a property with a key of /canary
that should be writing a FlowFile attribute to that key somewhere in the Avro document. It does not appear anywhere in the output, though.
It would be acceptable to move the records in the Avro data to a subkey and have a metadata section be a part of the record data. I would prefer not to do this, though, because it does not seem like the correct solution and because it sounds much more complex than simply modifying the Avro metadata.
The record-aware processors (and the Readers/Writers) are not metadata-aware, meaning they cannot currently (as of NiFi 1.5.0) act on metadata in any way (inspect, create, delete, etc.), so UpdateRecord won't work for metadata per se. With your /canary property key, it will try to insert a field into your Avro record at the top level, named canary, and should have the value you specify. However I believe your output schema needs to have the canary field added at the top level, or it may be ignored (I'm not positive of this, you can check the output schema to see if it is added automatically).
There is currently no NiFi processor that can update Avro metadata explicitly (MergeContent does some with regards to merging various Avro files together, but you can't choose to set a value, e.g.). However I have an unpolished Groovy script you could use in ExecuteScript to add metadata to Avro files in NiFi 1.5.0+. In ExecuteScript you would set the language to Groovy and the following as the Script Body, then add user-defined (aka "dynamic" properties) to ExecuteScript, where the key will be the metadata key, and the evaluated value (the properties support Expression Language) will be the value:
@Grab('org.apache.avro:avro:1.8.1')
import org.apache.avro.*
import org.apache.avro.file.*
import org.apache.avro.generic.*
def flowFile = session.get()
if(!flowFile) return
try {
// Save off dynamic property values for metadata key/values later
def metadata = [:]
context.properties.findAll {e -> e.key.dynamic}.each {k,v -> metadata.put(k.name, context.getProperty(k).evaluateAttributeExpressions(flowFile).value.bytes)}
flowFile = session.write(flowFile, {inStream, outStream ->
DataFileStream<GenericRecord> reader = new DataFileStream<>(inStream, new GenericDatumReader<GenericRecord>())
DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>())
def schema = reader.schema
def inputCodec = reader.getMetaString(DataFileConstants.CODEC) ?: DataFileConstants.NULL_CODEC
// Forward the existing metadata to the output
reader.metaKeys.each { key ->
if (!DataFileWriter.isReservedMeta(key)) {
byte[] metadatum = reader.getMeta(key)
writer.setMeta(key, metadatum)
}
}
// For each dynamic property, set the key/value pair as Avro metadata
metadata.each {k,v -> writer.setMeta(k,v)}
writer.setCodec(CodecFactory.fromString(inputCodec))
writer.create(schema, outStream)
writer.appendAllFrom(reader, false)
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
log.error('Error adding Avro metadata, penalizing flow file and routing to failure', e)
flowFile = session.penalize(flowFile)
session.transfer(flowFile, REL_FAILURE)
}
Note that this script can work with versions of NiFi previous to 1.5.0, but the @Grab at the top is not supported until 1.5.0, so you'd have to download Avro and its dependencies into a flat folder, and point to that in the Module Directory property of ExecuteScript.