I have 5 different CSVReader
controller services. Their configuration is the same except schema text
(because different headers) and 1 CSVRecordSetWriter
.
I want to leave only one CSVReader
and set schema text
dynamically. I read about AvroSchemaRegistry
but I didn't clearly understand how to use it.
Should i create 5 different AvroSchemaRegistry
controllers with 2 attributes: name
and value
?
F.e. i want to put following schema:
{
"type": "record",
"name": "campaigns",
"namespace": "common",
"fields": [
{"name": "campaign_name", "type": "string"},
{"name": "campaign_id", "type": "long"},
{"name": "date", "type" : {"type": "int", "logicalType" : "date"}}
]
}
I should create AvroSchemaRegistry
with:
name
= campaigns;value
= Full Code above.For another schema i should create another AvroSchemaResgitry
controller with another attributes name
and value
?
After that, how to configure CSVReader
and CSVRecordSetWriter
to work with these schemas? And finally, what should i do with flowfiles? Add additional attributes? What kind of?
Configure your flow something like this (make changes as per your requirement) ,
UpdateAttribute
configuration to derive/hard code flowfile specific schema-ValidateRecord
configuration to use generic csv reader and pass schema dynamically -CSVReader
controller service to use dynamically passed schema and set schema access strategy -If you wish to use NiFi supported schema registry then put all of your schemas in registry and set schema.name
and access strategy
properties for record reader/writer to access schema from registry, but first you need to add/configure schema registry provider in controller service.
Update:
Sample SchemaRegistry controller service configuration for HortonworksSchemaRegistry
(ConfluentSchemaRegistry and AvroSchemaRegistry have more or less the same properties). The important thing is what parameter registry API expect when lookup operation being performed, so that we just need to pass the same value from Reader/Writer controller service Schema Name
property, please refer to this to get an idea.
Schema Registry URL : http://example.com:7788/api/v1