Search code examples
pythonairflow

How to dynamically create tasks in airflow


I can't figure out how to dynamically create tasks in airflow at schedule time. My Dag is created prior to the knowledge of how many tasks are required at run-time. I.e., On each dag trigger, i would like to pass the directory to be processed to create a list of tasks for the following Dag.

I couldn't come up with anything so far

args = {
    'owner': 'airflow',
    'start_date': datetime(2004, 11, 12),
}

dag = DAG(
    dag_id='dyn_test',
    default_args=args,
    schedule_interval='@once'
)


dir = '/home/uname/dir'
filesInDir = next(os.walk(dir))[2] 

for file in filesInDir:
    task1 = # change 'file' structure
    task2 = # store changed 'file'

    task1 >> task2

Here, how should i pass 'dir' variable while triggering the Dag so that task1 and task2 will run based on number of files present in the 'dir'.


Solution

  • You can use Airflow Variables or Environment variables.

    # Using Airflow Variables
    from airflow.models import Variable
    dir = Variable.get("dir")
    
    # Using Env Vars
    import os
    dir1= os.environ["dir1"]
    
    args = {
        'owner': 'airflow',
        'start_date': datetime(2004, 11, 12),
    }
    
    dag = DAG(
        dag_id='dyn_test',
        default_args=args,
        schedule_interval='@once'
    )
    
    
    filesInDir = next(os.walk(dir))[2] 
    
    for file in filesInDir:
        task1 = # change 'file' structure
        task2 = # store changed 'file'
    
        task1 >> task2