Search code examples
pythonairflowdirected-acyclic-graphs

Triggering an airflow dag based on filesystem changes


I am trying to write a pipeline where the postgres db should update with contents of a csv when it is brought to the folder. I have written a dag which creates the table and pushes the csv content when it is triggered from the web UI. Here's the code:

from datetime import datetime
from airflow import DAG
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.python_operator import PythonOperator
import psycopg2

with DAG('Write_data_to_PG', description='This DAG is for writing data to postgres.', 
    schedule_interval='*/5 * * * *',
         start_date=datetime(2018, 11, 1), catchup=False) as dag:
    create_table = PostgresOperator(
        task_id='create_table',
        sql="""CREATE TABLE users(
            id integer PRIMARY KEY,
            email text,
            name text,
            address text
        )
        """,
    )

    def my_func():
        print('Pushing data in database.')
        conn = psycopg2.connect("host=localhost dbname=testdb user=testuser")
        print(conn)

        cur = conn.cursor()
        print(cur)

        with open('test.csv', 'r') as f:
            next(f)  # Skip the header row.
            cur.copy_from(f, 'users', sep=',')

        conn.commit()
        print(conn)
        print('DONE!!!!!!!!!!!.')


    python_task = PythonOperator(task_id='python_task', python_callable=my_func)

    create_table >> python_task

I am not able to figure out how to trigger the tasks when the csv is pasted/brought manually to the folder. Any help would be appreciated, thanks in advance.


Solution

  • Turns out Airflow has a special module just for such requirement. I solved the problem using FileSensor provided by airflow itself.

    According the doucmentation:

    FileSensor Waits for a file or folder to land in a filesystem. If the path given is a directory then this sensor will only return true if any files exist inside it (either directly, or within a subdirectory)

    Here is the modified code, it waits for the file called test.csv and it proceeds to the next task only when it finds the file in the airflow folder (or any folder, you need to specify the path):

    from datetime import datetime
    from airflow import DAG
    from airflow.contrib.sensors.file_sensor import FileSensor
    from airflow.operators.postgres_operator import PostgresOperator
    from airflow.operators.python_operator import PythonOperator
    import psycopg2
    
    with DAG('Write_data_to_PG', description='This DAG is for writing data to postgres.', schedule_interval='*/5 * * * *',
             start_date=datetime(2018, 11, 1), catchup=False) as dag:
        create_table = PostgresOperator(
            task_id='create_table',
            sql="""CREATE TABLE users(
                id integer PRIMARY KEY,
                email text,
                name text,
                address text
            )
            """,
        )
    
    
        def my_func():
            print('Creating table in database.')
            conn = psycopg2.connect("host=localhost dbname=testdb user=testuser")
            print(conn)
    
            cur = conn.cursor()
            print(cur)
    
            with open('test.csv', 'r') as f:
                next(f)  # Skip the header row.
                cur.copy_from(f, 'users', sep=',')
    
            conn.commit()
            print(conn)
            print('DONE!!!!!!!!!!!.')
    
    
        file_sensing_task = FileSensor(task_id='sense_the_csv',
                                       filepath='test.csv',
                                       fs_conn_id='my_file_system',
                                       poke_interval=10)
    
        python_task = PythonOperator(task_id='populate_data', python_callable=my_func)
    
        create_table >> file_sensing_task >> python_task