Search code examples
airflowairflow-2.x

Why does Airflow ExternalTaskSensor not work on the dag having PythonOperator?


  • Airflow version: v2.3.0
  • OS: ubuntu 22.04

1. Dag structure working well(without failed)

from datetime import datetime

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.sensors.external_task import ExternalTaskSensor


def temp_task():
    print(1)

a_dag = DAG(
    dag_id='a_dag1', default_args={'owner': 'brownbear'}, start_date=datetime(2021, 11, 6, 0, 0, 0),
    schedule_interval="*/1 * * * *", tags=['external'],
    catchup=False
)
with a_dag:
    start = DummyOperator(task_id='wow1')
    end = DummyOperator(task_id='wow2')
    start >> end


b_dag = DAG(
    dag_id='a_dag2', default_args={'owner': 'brownbear'}, start_date=datetime(2021, 11, 6, 0, 0, 0),
    schedule_interval='*/1 * * * *', tags=['external'],
    catchup=False
)
with b_dag:
    downstream_task1 = ExternalTaskSensor(
        task_id="downstream_task1",
        mode='reschedule',
        external_dag_id='a_dag1',
        external_task_id="wow2",
        timeout=600,
    )
    start2 = DummyOperator(task_id='start2')
    start2 >> downstream_task1
  • result:

enter image description here

2. Dag structure failed

  • only a_dag1 part changed (DummyOperator -> PythonOperator)
def temp_task():
    print(1)

a_dag = DAG(
    dag_id='a_dag1', default_args={'owner': 'brownbear'}, start_date=datetime(2021, 11, 6, 0, 0, 0),
    schedule_interval="*/1 * * * *", tags=['external'],
    catchup=False
)
with a_dag:
    # Doens't work...
    task1 = PythonOperator(task_id='wow1', python_callable=temp_task)
    task2 = PythonOperator(task_id='wow2', python_callable=temp_task)
    task1 >> task2
  • result:

enter image description here

  • log(WebUI): (Nothing appears...) enter image description here

  • log(process):

airflow-scheduler_1  | [2022-08-19 01:12:02,238] {dag.py:2915} INFO - Setting next_dagrun for a_dag2 to 2022-08-19T01:12:00+00:00, run_after=2022-08-19T01:13:00+00:00
airflow-scheduler_1  | [2022-08-19 01:12:02,243] {dagrun.py:562} INFO - Marking run <DagRun a_dag1 @ 2022-08-19 01:11:00+00:00: scheduled__2022-08-19T01:11:00+00:00, externally triggered: False> successful
airflow-scheduler_1  | [2022-08-19 01:12:02,244] {dagrun.py:607} INFO - DagRun Finished: dag_id=a_dag1, execution_date=2022-08-19 01:11:00+00:00, run_id=scheduled__2022-08-19T01:11:00+00:00, run_start_date=2022-08-19 01:12:00.111676+00:00, run_end_date=2022-08-19 01:12:02.244042+00:00, run_duration=2.132366, state=success, external_trigger=False, run_type=scheduled, data_interval_start=2022-08-19 01:11:00+00:00, data_interval_end=2022-08-19 01:12:00+00:00, dag_hash=c05eae379e808492a6614dfda6985c68
airflow-scheduler_1  | [2022-08-19 01:12:02,248] {dag.py:2915} INFO - Setting next_dagrun for a_dag1 to 2022-08-19T01:12:00+00:00, run_after=2022-08-19T01:13:00+00:00
airflow-scheduler_1  | [2022-08-19 01:12:02,250] {dagrun.py:547} ERROR - Marking run <DagRun after_dag2 @ 2022-08-19 01:11:00+00:00: scheduled__2022-08-19T01:11:00+00:00, externally triggered: False> failed
airflow-scheduler_1  | [2022-08-19 01:12:02,251] {dagrun.py:607} INFO - DagRun Finished: dag_id=after_dag2, execution_date=2022-08-19 01:11:00+00:00, run_id=scheduled__2022-08-19T01:11:00+00:00, run_start_date=2022-08-19 01:12:00.112784+00:00, run_end_date=2022-08-19 01:12:02.251044+00:00, run_duration=2.13826, state=failed, external_trigger=False, run_type=scheduled, data_interval_start=2022-08-19 01:11:00+00:00, data_interval_end=2022-08-19 01:12:00+00:00, dag_hash=a87e590bae62e97d0798e39e15be9f55
airflow-scheduler_1  | [2022-08-19 01:12:02,255] {dag.py:2915} INFO - Setting next_dagrun for after_dag2 to 2022-08-19T01:12:00+00:00, run_after=2022-08-19T01:13:00+00:00
airflow-scheduler_1  | [2022-08-19 01:12:02,300] {scheduler_job.py:596} INFO - Executor reports execution of a_dag2.downstream_task1 run_id=scheduled__2022-08-19T01:11:00+00:00 exited with status queued for try_number 1
airflow-scheduler_1  | [2022-08-19 01:12:02,300] {scheduler_job.py:596} INFO - Executor reports execution of a_dag1.wow2 run_id=scheduled__2022-08-19T01:11:00+00:00 exited with status success for try_number 1
airflow-scheduler_1  | [2022-08-19 01:12:02,300] {scheduler_job.py:596} INFO - Executor reports execution of after_dag2.wait_for_task_2 run_id=scheduled__2022-08-19T01:11:00+00:00 exited with status success for try_number 1
airflow-scheduler_1  | [2022-08-19 01:12:02,303] {scheduler_job.py:630} INFO - Setting external_id for <TaskInstance: a_dag2.downstream_task1 scheduled__2022-08-19T01:11:00+00:00 [failed]> to d43c2bef-0f56-4556-b34e-af64158e0545
airflow-scheduler_1  | [2022-08-19 01:12:02,303] {scheduler_job.py:640} INFO - TaskInstance Finished: dag_id=after_dag2, task_id=wait_for_task_2, run_id=scheduled__2022-08-19T01:11:00+00:00, map_index=-1, run_start_date=2022-08-19 01:12:00.769613+00:00, run_end_date=2022-08-19 01:12:01.117768+00:00, run_duration=0.348155, state=failed, executor_state=success, try_number=1, max_tries=0, job_id=708751, pool=default_pool, queue=default, priority_weight=2, operator=ExternalTaskSensor, queued_dttm=2022-08-19 01:12:00.990796+00:00, queued_by_job_id=650667, pid=1357729
airflow-scheduler_1  | [2022-08-19 01:12:02,303] {scheduler_job.py:640} INFO - TaskInstance Finished: dag_id=a_dag1, task_id=wow2, run_id=scheduled__2022-08-19T01:11:00+00:00, map_index=-1, run_start_date=2022-08-19 01:12:01.506658+00:00, run_end_date=2022-08-19 01:12:01.658264+00:00, run_duration=0.151606, state=success, executor_state=success, try_number=1, max_tries=0, job_id=708754, pool=default_pool, queue=default, priority_weight=1, operator=PythonOperator, queued_dttm=2022-08-19 01:12:00.990796+00:00, queued_by_job_id=650667, pid=1357738
airflow-scheduler_1  | [2022-08-19 01:12:03,357] {scheduler_job.py:596} INFO - Executor reports execution of a_dag2.downstream_task1 run_id=scheduled__2022-08-19T01:11:00+00:00 exited with status success for try_number 1
airflow-scheduler_1  | [2022-08-19 01:12:03,359] {scheduler_job.py:640} INFO - TaskInstance Finished: dag_id=a_dag2, task_id=downstream_task1, run_id=scheduled__2022-08-19T01:11:00+00:00, map_index=-1, run_start_date=2022-08-19 01:12:00.871492+00:00, run_end_date=2022-08-19 01:12:01.205089+00:00, run_duration=0.333597, state=failed, executor_state=success, try_number=1, max_tries=0, job_id=708753, pool=default_pool, queue=default, priority_weight=1, operator=ExternalTaskSensor, queued_dttm=2022-08-19 01:12:01.082837+00:00, queued_by_job_id=650667, pid=1357731
airflow-scheduler_1  | [2022-08-19 01:12:05,177] {dagrun.py:562} INFO - Marking run <DagRun after_dag1 @ 2022-08-19 01:11:00+00:00: scheduled__2022-08-19T01:11:00+00:00, externally triggered: False> successful
airflow-scheduler_1  | [2022-08-19 01:12:05,177] {dagrun.py:607} INFO - DagRun Finished: dag_id=after_dag1, execution_date=2022-08-19 01:11:00+00:00, run_id=scheduled__2022-08-19T01:11:00+00:00, run_start_date=2022-08-19 01:12:00.112426+00:00, run_end_date=2022-08-19 01:12:05.177649+00:00, run_duration=5.065223, state=success, external_trigger=False, run_type=scheduled, data_interval_start=2022-08-19 01:11:00+00:00, data_interval_end=2022-08-19 01:12:00+00:00, dag_hash=edf550ca4ca8e90dfdeb6b1d2a06c789
airflow-scheduler_1  | [2022-08-19 01:12:05,182] {dag.py:2915} INFO - Setting next_dagrun for after_dag1 to 2022-08-19T01:12:00+00:00, run_after=2022-08-19T01:13:00+00:00
airflow-scheduler_1  | [2022-08-19 01:12:05,208] {scheduler_job.py:596} INFO - Executor reports execution of after_dag1.b_task run_id=scheduled__2022-08-19T01:11:00+00:00 exited with status success for try_number 1
airflow-scheduler_1  | [2022-08-19 01:12:05,211] {scheduler_job.py:640} INFO - TaskInstance Finished: dag_id=after_dag1, task_id=b_task, run_id=scheduled__2022-08-19T01:11:00+00:00, map_index=-1, run_start_date=2022-08-19 01:12:01.641734+00:00, run_end_date=2022-08-19 01:12:04.801520+00:00, run_duration=3.159786, state=success, executor_state=success, try_number=1, max_tries=0, job_id=708755, pool=default_pool, queue=default, priority_weight=1, operator=PythonOperator, queued_dttm=2022-08-19 01:12:01.082837+00:00, queued_by_job_id=650667, pid=1357739


Solution

  • I just tested your code with airflow from 2.3.0 to 2.3.3, it didn't work with 2.3.0 but it works normally with the other versions, so it seems that there was a bug solved in 2.3.1.