I just started with Airflow DAG and encountered a strange issue with the tool. I am using airflow version 2.3.3 with SequentialExecutor.
The script I Used:
import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
dag_args = {
'owner': 'hao',
'retries': 2,
'retry_delay': datetime.timedelta(minutes=1)
}
with DAG(
dag_id='dependency_experiment',
default_args=dag_args,
description='experiment the dag task denpendency expression',
start_date=datetime.datetime.now(),
schedule_interval='@daily',
dagrun_timeout=datetime.timedelta(seconds=10),
) as dag:
pyOp = PythonOperator(
task_id='pyOp',
python_callable=lambda x: haha * x,
op_kwargs={'x': 10}
)
pyOp
The Log Snippit of This Task:
NameError: name 'haha' is not defined
[2022-07-27, 18:19:34 EDT] {taskinstance.py:1415} INFO - Marking task as UP_FOR_RETRY. dag_id=dependency_experiment, task_id=pyOp, execution_date=20220728T021932, start_date=20220728T021934, end_date=20220728T021934
[2022-07-27, 18:19:34 EDT] {standard_task_runner.py:92} ERROR - Failed to execute job 44 for task pyOp (name 'haha' is not defined; 19405)
[2022-07-27, 18:19:34 EDT] {local_task_job.py:156} INFO - Task exited with return code 1
[2022-07-27, 18:19:34 EDT] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check
Problem:
I purposefully defined a PythonOperator, which would fail. When I put the script on DAG, the task raised an exception as expected; however, the status for this task is always skipped.
I cannot figure out why the task didn't show a failed
status as expected. Any suggestions will be much appreciated.
It's because you are defining 'retries'
and 'retry_delay'
in your dag_args dictionary.
default_args (Optional[Dict]) – A dictionary of default parameters to be used as constructor keyword parameters when initialising operators. Note that operators have the same hook, and precede those defined here, meaning that if your dict contains ‘depends_on_past’: True here and ‘depends_on_past’: False in the operator’s call default_args, the actual value will be False.
When you set the 'retries'
to a value, Airflow thinks that the Task would be retried in an other time. So it shows it in UI as skipped.
If you delete 'retries'
and 'retry_delay'
from the dag_args
, you'll see that task set to failed when you try to initiate the DAG.
When I ran your code in the logs I see:
INFO - Marking task as UP_FOR_RETRY. dag_id=dependency_experiment, task_id=pyOp, execution_date=20220729T060953, start_date=20220729T060953, end_date=20220729T060953
After I delete the 'retries'
and 'retry_delay'
the same log becomes:
INFO - Marking task as FAILED. dag_id=dependency_experiment, task_id=pyOp, execution_date=20220729T061031, start_date=20220729T061031, end_date=20220729T061031