Search code examples
apache-nifi

Using AvroSchemaRegistry in Apache NiFi


I have 5 different CSVReader controller services. Their configuration is the same except schema text (because different headers) and 1 CSVRecordSetWriter.

I want to leave only one CSVReader and set schema text dynamically. I read about AvroSchemaRegistry but I didn't clearly understand how to use it.

Should i create 5 different AvroSchemaRegistry controllers with 2 attributes: name and value? F.e. i want to put following schema:

{
    "type": "record",
    "name": "campaigns",
    "namespace": "common",
    "fields": [
        {"name": "campaign_name", "type": "string"},
        {"name": "campaign_id", "type": "long"},
        {"name": "date", "type" : {"type": "int", "logicalType" : "date"}}
    ]
}

I should create AvroSchemaRegistry with:

  • name = campaigns;
  • value = Full Code above.

For another schema i should create another AvroSchemaResgitry controller with another attributes name and value?

After that, how to configure CSVReader and CSVRecordSetWriter to work with these schemas? And finally, what should i do with flowfiles? Add additional attributes? What kind of?


Solution

  • Configure your flow something like this (make changes as per your requirement) ,

    flow

    1. UpdateAttribute configuration to derive/hard code flowfile specific schema-

    UpdateAttribute

    1. ValidateRecord configuration to use generic csv reader and pass schema dynamically -

    ValidateRecord

    1. CSVReader controller service to use dynamically passed schema and set schema access strategy -

    CSVReader

    If you wish to use NiFi supported schema registry then put all of your schemas in registry and set schema.name and access strategy properties for record reader/writer to access schema from registry, but first you need to add/configure schema registry provider in controller service.

    Update:

    Sample SchemaRegistry controller service configuration for HortonworksSchemaRegistry (ConfluentSchemaRegistry and AvroSchemaRegistry have more or less the same properties). The important thing is what parameter registry API expect when lookup operation being performed, so that we just need to pass the same value from Reader/Writer controller service Schema Name property, please refer to this to get an idea.

    Schema Registry URL : http://example.com:7788/api/v1

    HortonworksSchemaRegistry