I have a setup of Apache Nifi for my ETL pipeline and want to start (and later monitor) a specific processor with Apache Airflow.
I see two ways of achieving this from within an airflow DAG:
I looked at the official airflow documentation and know how to write a (basic) DAG with a PythonOperator:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
dag = DAG(
dag_id='python_nifi_operator',
schedule_interval=None,
start_date=days_ago(2),
tags=['example'],
)
def generate_flow_file():
"""Generate and insert a flow file"""
# connect to Nifi
pass
# access processor
pass
# create flow file
pass
# insert flow file
pass
return 'Success-message for the log'
run_this = PythonOperator(
task_id='generate_a_custom_flow_file',
python_callable=generate_flow_file,
dag=dag,
)
The question is: how can I generate a flow file with Python? I have been looking for a library but I only find other stackoverflow posts with code excerpts which don't help me and I can't even find a documentation for the packages they used. Any tips / complete code examples / links are welcome.
There is no API to 'generate' a FlowFile and it wouldn't really make much sense to have one.
That said, you could use the GenerateFlowFile processor and stop/start it wth the REST API - there have been previous questions asking how to do this using the API https://nifi.apache.org/docs/nifi-docs/rest-api/index.html https://pypi.org/project/nipyapi/
Or you could have a ListenHTTP/HandleHttpRequest listening on a endpoint just for Airflow, which you could trigger in Python by sending an empty HTTP request to the configured end point, thus generating a FlowFile