I can't figure out how to dynamically create tasks in airflow at schedule time. My Dag is created prior to the knowledge of how many tasks are required at run-time. I.e., On each dag trigger, i would like to pass the directory to be processed to create a list of tasks for the following Dag.
I couldn't come up with anything so far
args = {
'owner': 'airflow',
'start_date': datetime(2004, 11, 12),
}
dag = DAG(
dag_id='dyn_test',
default_args=args,
schedule_interval='@once'
)
dir = '/home/uname/dir'
filesInDir = next(os.walk(dir))[2]
for file in filesInDir:
task1 = # change 'file' structure
task2 = # store changed 'file'
task1 >> task2
Here, how should i pass 'dir' variable while triggering the Dag so that task1 and task2 will run based on number of files present in the 'dir'.
You can use Airflow Variables or Environment variables.
# Using Airflow Variables
from airflow.models import Variable
dir = Variable.get("dir")
# Using Env Vars
import os
dir1= os.environ["dir1"]
args = {
'owner': 'airflow',
'start_date': datetime(2004, 11, 12),
}
dag = DAG(
dag_id='dyn_test',
default_args=args,
schedule_interval='@once'
)
filesInDir = next(os.walk(dir))[2]
for file in filesInDir:
task1 = # change 'file' structure
task2 = # store changed 'file'
task1 >> task2