Search code examples
pythonpython-requestsapache-nifi

Create Nifi PutSQL Processor with Python


I want to transfer my Nifi-ETL pipeline (mainly PUTSQL-processors) from my development instance to my production instance of Apache Nifi, optimally with Python for re-usability.

I thought I would give it a shot to just try and copy-paste them.

  1. GET processor from DEV with GET-request on /nifi-api/processors/{id}
  2. PUT processor to PRD Nifi-instance with PUT-request on /nifi-api/processors/{id}

Code:

# GET processor and parse to JSON
response = requests.get(nifi_url_dev + 'processors/' + proc_id
                            , headers=header)
processor = json.loads(response.content)


# PUT processor 
processor['revision']['version'] = 0 # reset version
payload = json.dumps(processor).encode('utf8')
response = requests.put(nifi_url_prd + 'processors/' + proc_id
                        , data=payload
                        , headers=header)

This failed on the PUT with a 409 HTTP Conflict Error. I am guessing this is because I am trying to put a ressource on an URI that expects a resource to exist already at that place.

The documentation lists "Create a processor, Set properties, Schedule" next to the processor APIs, but when looking into it, there is no dedicated API for creation - I decided to go with PUT because it says "Updates a processor" which is the closest thing I can see in there to creating a new one from scratch.

Do you have any ideas on how to create processors with Python? Either by copying existing ones or creating entirely new ones?


Solution

  • So the problem was that the API documentation is a bit misleading... The correct API to create a new processor is process-groups/{process_group_id}/processors . It is also listed in the docs under "Process Groups" and not under "Processors" despite the description.

    The following worked for me - it was necessary to adapt the json a bit though: mainly to delete any IDs of the Dev-environment.

    # GET processor and parse to JSON
    response = requests.get(nifi_url_dev + 'processors/' + proc_id
                                , headers=header)
    processor = json.loads(response.content)
    
    
    # PUT processor 
    del processor["id"]  # Processor ID cannot be specified.
    del processor["uri"]  # Processor ID cannot be specified.
    del processor["component"]["id"]  # Processor ID cannot be specified.
    
    del processor["component"]["parentGroupId"]  # Parent process group id should not be specified.
    # If specified, the parent process group id must be the same as specified in the POST-URI.
    
    processor['revision']['version'] = 0 # reset version
    payload = json.dumps(processor).encode('utf8')
    response = requests.post(nifi_url_prd + 'process-groups/' + process_group + '/processors'
                            , data=payload
                            , headers=header)