I have created a DAG file to schedule python and Apache Ni-Fi tasks.
In order to control the Ni-Fi pipeline, I have followed the suggestions contained in the following page: https://towardsdatascience.com/interconnecting-airflow-with-a-nifi-etl-pipeline-8abea0667b8a
I have added a GenerateFlowFile processor to my Ni-Fi pipeline to let Airflow trigger Ni-Fi and an UpdateAttribute processor to let Airlow know when the pipeline is over.
Apache Airflow runs from an AWS EC2 instance and Apache Ni-Fi runs in another AWS EC2 instance.
I have created rules in AWS security group to let each of the two EC2 instances access the other one. I have tried by using both private and public IP addresses of the machines as well as the machines' security groups:
Security group of machine 1:
Security group of machine 2:
Problem: My DAG works with the python script but not with the Ni-Fi pipeline. The pipeline works fine if started from Ni-Fi.
Here is an extract of the DAG file (scroll down):
# Importing the needed modules
import requests
import json
from time import time
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
# Functions to be used in the Ni-FI tasks
def get_processor(url_nifi_api: str, processor_id: str, token=None):
"""
This function gets and returns a single processor.
It makes use of the REST API `/processors/{processor_id}`.
:param url_nifi_api: String
:param processor_id: String
:param token: JWT access token
:returns: JSON object processor
"""
# Authorization header
header = {
"Content-Type": "application/json",
"Authorization": "Bearer {}".format(token),
}
# GET processor and parse to JSON
response = requests.get(url_nifi_api + f"processors/{processor_id}", headers=header)
return json.loads(response.content)
def get_processor_state(url_nifi_api: str, processor_id: str, token=None):
"""
This function gets and returns a single processor state.
It makes use of the REST API 'processors/{processor_id}/state'.
:param url_nifi_api: String
:param processor_id: String
:param token: JWT access token
:returns: JSON object processor's state
"""
# Authorization header
if token is None:
header = {"Content-Type": "application/json"}
else:
header = {
"Content-Type": "application/json",
"Authorization": "Bearer {}".format(token),
}
# GET processor and parse to JSON
response = requests.get(
url_nifi_api + f"processors/{processor_id}/state", headers=header
)
return json.loads(response.content)
def get_token(url_nifi_api: str, access_payload: dict):
"""
This function retrieves a JWT token by authenticating the user.
It makes use of the REST API `/access/token`.
:param url_nifi_api: the basic URL to the NiFi API.
:param access_payload: dictionary with keys 'username' & 'password' and
fitting values.
:return: JWT Token
"""
# Authorization header
header = {
"Accept-Encoding": "gzip, deflate, br",
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "*/*",
}
response = requests.post(
url_nifi_api + "access/token", headers=header, data=access_payload
)
return response.content.decode("ascii")
def update_processor_status(processor_id: str, new_state: str, token, url_nifi_api):
"""
This function starts or stops a processor by retrieving the processor to get
the current revision and finally put a JSON with the desired
state towards the API.
It needs the function get_processor
:param processor_id: Id of the processor to receive the new state.
:param new_state: String representing the new state,
acceptable values are: STOPPED or RUNNING.
:param token: a JWT access token for NiFi.
:param url_nifi_api: URL to the NiFi API
:return: None
"""
# Retrieve processor from `/processors/{processor_id}`
processor = get_processor(url_nifi_api, processor_id, token)
# Create a JSON with the new state and the processor's revision
put_dict = {
"revision": processor["revision"],
"state": new_state,
"disconnectedNodeAcknowledged": True,
}
# Dump JSON and POST processor
payload = json.dumps(put_dict).encode("utf8")
header = {
"Content-Type": "application/json",
"Authorization": "Bearer {}".format(token),
}
response = requests.put(
url_nifi_api + f"processors/{processor_id}/run-status",
headers=header,
data=payload,
)
return response
def parse_state(json_obj, state_key: str):
"""
This function retrieves the value of a state by the key of the state out
of the JSON.
:param json_obj: the processor's general state.
:param state_key: the key for the specific state.
:raises ValueError: if the passed key cannot be found in the processor state.
:return: value of the matching key.
"""
states = json_obj["componentState"]["localState"]["state"]
for state in states:
if state["key"] == state_key:
value = state["value"]
return value
raise ValueError(f"Could not find {state_key} ")
def pause(secs):
init_time = time()
while time() < init_time + secs:
pass
def startnifi():
# This function sets the Ni-Fi processor GenerateFlowFile into RUNNING state,
# it wats 15 seconds to give the processor time to create a flowfile,
# it sets the GenerateFlowFile orocessor into STOPPED state.
# It needs the following functions:
# - get_token (retrieves a JSON Web Token by authenticating the user)
# - update_processor_status
# (retrieves the current status with a GET request, sets the state into
# a custom JSON and uses a PUT request to set a new status)
# Initializing the variables
url_nifi_api = "https://<ip address>:8443/nifi/?processGroupId=<clip>"
# Check the id from the processor settings page in Ni-Fi
processor_id = "<GenerateFlowFile processor id>"
access_payload = {"username": "<username>",
"password": "<password>"
}
token = get_token(url_nifi_api, access_payload)
response = update_processor_status(processor_id, "RUNNING", token, url_nifi_api)
pause(15) # wait for 15 seconds to give NiFi time to create a flow file
response = update_processor_status(processor_id, "STOPPED", token, url_nifi_api)
def finishnifi():
# This function queries the last Ni-Fi processor (UpdateAttribute) state
# by using a while loop that allows reiterating the query every 60 seconds.
# Initializing the variables
url_nifi_api = "https://<ip address>:8443/nifi/?processGroupId=<clip>"
# Check the id from the processor settings page in Ni-Fi
processor_id = "<UpdateAttribute processor id>"
access_payload = {"username": "<username>",
"password": "<password>"
}
timestamp_property = "last_tms" # the processor's attribute name
token = get_token(url_nifi_api, access_payload)
# Get the current timestamp
processor_state = get_processor_state(url_nifi_api, processor_id, token=token)
value_start = parse_state(processor_state, timestamp_property)
# Query and wait until update or time out
while True:
processor_state = get_processor_state(url_nifi_api, processor_id, token=token)
value_current = parse_state(processor_state, timestamp_property)
if value_start == value_current:
print("Waiting...")
pause(60)
else:
print(f"Update found: {value_current}")
break
# Initializing the default arguments
# Using dynamic dates for start_date would lead to never started tasks
default_args = {
'owner': 'Benkku',
'start_date': datetime(2023, 7, 3, 6), # year, month, day, hour
'retries': 0,
'retry_delay': timedelta(minutes=5)
}
# Instantiating a DAG object
prod_to_alfaDEV_dag = DAG(dag_id='prod_to_alfaDEV',
default_args=default_args,
description='Tasks to copy Timescale production into' \
'Timescale alfa',
# cron schedule (minute, hour, month day, month, week day)
schedule=None, # for testing only
# schedule='6 3 14 * *', # 03:06 UTC of the 14th of
# each month
catchup=False, # if False the scheduler creates a DAG run
# only for the latest schedule interval
tags=['Timescale, alfa, refresh']
)
# Creating the 4th task
# This task starts a Apache Ni-Fi pipeline
nifistart_task = PythonOperator(task_id='nifistart',
python_callable=startnifi,
dag=prod_to_alfaDEV_dag)
# Creating the 5th task
# This task monitors a ni-Fi pipeline till the pipeline is over
nififinish_task = PythonOperator(task_id='nififinish',
python_callable=finishnifi,
dag=prod_to_alfaDEV_dag)
# Setting the order of tasks' execution
nifistart_task >> nififinish_task
Regarding url_nifi_api
, in 'ip address' I have tried with both the private and the public ip address. The rest of the URL in 'clip' has been copied from Apache Ni-Fi when opening the processor group.
Regarding processor_id
, 'processor id' in def startnifi
has been copied from Apache Ni-Fi by going to Settings > Id in the first processor (GenerateFlowFile). In def finishnifi
the Id has been read from the last processor (UpdateAttribute).
In access_payload
I have used the username and password I use in Apache Ni-Fi.
This is the error I get in Airflow when running the DAG:
Failed to establish a new connection: [Errno 111] Connection refused in the first task (nifistart).
Any idea what there might be wrong?
I am running Airflow 2.6.2 and Apache Ni-Fi 1.18.0 on Linux Ubuntu 22.04.1. I have used the Airflow installation from PyPI.
Thanks,
Bernardo Di Chiara
The problem has been solved with the following modifications:
The URL was wrong. The correct URL in def startnifi()
and def finishnifi()
is:
url_nifi_api = "https://ipaddress:8443/nifi-api/"
where ipaddress is the private ip address of the machine running Apache Ni-Fi.
Since both machines are in the same private Cloud, the verify=False
attribute has been added to all the http operations in the DAG file.
In the Linux machine running Ni-Fi, .../nifi-1.18.0/conf/nifi.properties
has been edited in order to allow access from another EC2 machine.
Old configuration: nifi.web.https.host = 127.0.0.1
New configuration: nifi.web.https.host = 0.0.0.0
nifi.sh has been stopped and started again
NiFi has been started in the browser from: https://0.0.0.0:8443/nifi/login