Search code examples
pythonairflowapache-nififlowfile

Starting Apache Nifi with Apache Airflow - how to generate a flow file


I have a setup of Apache Nifi for my ETL pipeline and want to start (and later monitor) a specific processor with Apache Airflow.

I see two ways of achieving this from within an airflow DAG:

  1. Generate a flow file from scratch and insert it into a Nifi queue/processor
  2. Trigger a "generate-flow-file-processor" to create a flow file which in turns inserts it into the queue

I looked at the official airflow documentation and know how to write a (basic) DAG with a PythonOperator:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

dag = DAG(
    dag_id='python_nifi_operator',
    schedule_interval=None,
    start_date=days_ago(2),
    tags=['example'],
)

def generate_flow_file():
    """Generate and insert a flow file"""
    # connect to Nifi
    pass
    # access processor
    pass
    # create flow file
    pass 
    # insert flow file 
    pass 
    return 'Success-message for the log'

run_this = PythonOperator(
    task_id='generate_a_custom_flow_file',
    python_callable=generate_flow_file,
    dag=dag,
)

The question is: how can I generate a flow file with Python? I have been looking for a library but I only find other stackoverflow posts with code excerpts which don't help me and I can't even find a documentation for the packages they used. Any tips / complete code examples / links are welcome.


Solution

  • There is no API to 'generate' a FlowFile and it wouldn't really make much sense to have one.

    That said, you could use the GenerateFlowFile processor and stop/start it wth the REST API - there have been previous questions asking how to do this using the API https://nifi.apache.org/docs/nifi-docs/rest-api/index.html https://pypi.org/project/nipyapi/

    Or you could have a ListenHTTP/HandleHttpRequest listening on a endpoint just for Airflow, which you could trigger in Python by sending an empty HTTP request to the configured end point, thus generating a FlowFile