Search code examples
schemaapache-nifiparquetdatabase-schema

MergeRecord based on schema; only merge records of the same schema


My use-case is:

  1. Have API credentials
  2. Use UpdateAttribute to update (1) schema, (2) s3 location/bucket etc.
  3. Query API endpoint for records
  4. Paginate API for more records
  5. Call MergeRecord
  6. Parquet out to PutS3Object

Since 3, 4, 5, 6 are all the same, I am re-using processors like (screenshot). My problem is (5) MergeRecord will try to merge different schemas together as queue items will flow in randomly from the funnel, which is obviously a problem.

How can I restructure this? I'd like to re-use processors as much as possible (DRY), but still be able to add more schemas as my needs evolve.

my nifi flows


Solution

  • I used Correlation Attribute Name, setting it to ${schema.name} and it's working as expected.

    https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.5.0/org.apache.nifi.processors.standard.MergeRecord/index.html

    If specified, two FlowFiles will be binned together only if they have the same value for this Attribute. If not specified, FlowFiles are bundled by the order in which they are pulled from the queue.