I want to transfer my Nifi-ETL pipeline (mainly PUTSQL-processors) from my development instance to my production instance of Apache Nifi
, optimally with Python
for re-usability.
I thought I would give it a shot to just try and copy-paste them.
Code:
# GET processor and parse to JSON
response = requests.get(nifi_url_dev + 'processors/' + proc_id
, headers=header)
processor = json.loads(response.content)
# PUT processor
processor['revision']['version'] = 0 # reset version
payload = json.dumps(processor).encode('utf8')
response = requests.put(nifi_url_prd + 'processors/' + proc_id
, data=payload
, headers=header)
This failed on the PUT with a 409 HTTP Conflict Error
. I am guessing this is because I am trying to put a ressource on an URI that expects a resource to exist already at that place.
The documentation lists "Create a processor, Set properties, Schedule" next to the processor APIs, but when looking into it, there is no dedicated API for creation - I decided to go with PUT because it says "Updates a processor" which is the closest thing I can see in there to creating a new one from scratch.
Do you have any ideas on how to create processors with Python? Either by copying existing ones or creating entirely new ones?
So the problem was that the API documentation is a bit misleading...
The correct API to create a new processor is process-groups/{process_group_id}/processors
. It is also listed in the docs under "Process Groups" and not under "Processors" despite the description.
The following worked for me - it was necessary to adapt the json a bit though: mainly to delete any IDs of the Dev-environment.
# GET processor and parse to JSON
response = requests.get(nifi_url_dev + 'processors/' + proc_id
, headers=header)
processor = json.loads(response.content)
# PUT processor
del processor["id"] # Processor ID cannot be specified.
del processor["uri"] # Processor ID cannot be specified.
del processor["component"]["id"] # Processor ID cannot be specified.
del processor["component"]["parentGroupId"] # Parent process group id should not be specified.
# If specified, the parent process group id must be the same as specified in the POST-URI.
processor['revision']['version'] = 0 # reset version
payload = json.dumps(processor).encode('utf8')
response = requests.post(nifi_url_prd + 'process-groups/' + process_group + '/processors'
, data=payload
, headers=header)