Search code examples
celeryairflow

Setup Airflow with remote Celery worker


I have Apache Airflow setup on a virtual machine that is within the local network and would like an extra Celery worker to be running on my local machine that still syncs with the rest of the airflow system. So far, after I start the worker on my local machine, the DAGs present on the local machine is not visible on the webserver (webserver is running on VM) right away, but they are briefly after I enter airflow dags reserialize on the local machine.

I get these messages in the worker logs after doing so:

[2022-06-07 09:54:41,661] {dagbag.py:507} INFO - Filling up the DagBag from /Users/wilbertung/Documents/lowitest/airflow/dags
[2022-06-07 09:54:41,680] {dagbag.py:507} INFO - Filling up the DagBag from None
[2022-06-07 09:54:41,809] {dag.py:2379} INFO - Sync 2 DAGs
[2022-06-07 09:54:41,853] {dag.py:2923} INFO - Setting next_dagrun for ChiSo to 2022-06-06T01:54:41.852752+00:00, run_after=2022-06-07T01:54:41.852752+00:00
[2022-06-07 09:54:41,853] {dag.py:2923} INFO - Setting next_dagrun for lowi17 to 2022-06-06T16:00:00+00:00, run_after=2022-06-07T16:00:00+00:00

Then, in the scheduler logs I get the following messages:

[2022-06-07 09:54:42,473] {scheduler_job.py:353} INFO - 3 tasks up for execution:
    <TaskInstance: lowi17.台灣醒報 manual__2022-06-06T06:00:03.787848+00:00 [scheduled]>
    <TaskInstance: lowi17.台灣新生報 manual__2022-06-06T06:00:03.787848+00:00 [scheduled]>
    <TaskInstance: lowi17.華視新聞網 manual__2022-06-06T06:00:03.787848+00:00 [scheduled]>
[2022-06-07 09:54:42,473] {scheduler_job.py:418} INFO - DAG lowi17 has 0/16 running and queued tasks
[2022-06-07 09:54:42,473] {scheduler_job.py:418} INFO - DAG lowi17 has 1/16 running and queued tasks
[2022-06-07 09:54:42,473] {scheduler_job.py:418} INFO - DAG lowi17 has 2/16 running and queued tasks
[2022-06-07 09:54:42,473] {scheduler_job.py:504} INFO - Setting the following tasks to queued state:
    <TaskInstance: lowi17.台灣醒報 manual__2022-06-06T06:00:03.787848+00:00 [scheduled]>
    <TaskInstance: lowi17.台灣新生報 manual__2022-06-06T06:00:03.787848+00:00 [scheduled]>
    <TaskInstance: lowi17.華視新聞網 manual__2022-06-06T06:00:03.787848+00:00 [scheduled]>
[2022-06-07 09:54:42,476] {scheduler_job.py:546} INFO - Sending TaskInstanceKey(dag_id='lowi17', task_id='台灣醒報', run_id='manual__2022-06-06T06:00:03.787848+00:00', try_number=3, map_index=-1) to executor with priority 1 and queue default
[2022-06-07 09:54:42,476] {base_executor.py:91} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'lowi17', '台灣醒報', 'manual__2022-06-06T06:00:03.787848+00:00', '--local', '--subdir', '/Users/wilbertung/Documents/lowitest/airflow/dags/DAG_lowi50.py']
[2022-06-07 09:54:42,477] {scheduler_job.py:546} INFO - Sending TaskInstanceKey(dag_id='lowi17', task_id='台灣新生報', run_id='manual__2022-06-06T06:00:03.787848+00:00', try_number=3, map_index=-1) to executor with priority 1 and queue default
[2022-06-07 09:54:42,477] {base_executor.py:91} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'lowi17', '台灣新生報', 'manual__2022-06-06T06:00:03.787848+00:00', '--local', '--subdir', '/Users/wilbertung/Documents/lowitest/airflow/dags/DAG_lowi50.py']
[2022-06-07 09:54:42,477] {scheduler_job.py:546} INFO - Sending TaskInstanceKey(dag_id='lowi17', task_id='華視新聞網', run_id='manual__2022-06-06T06:00:03.787848+00:00', try_number=3, map_index=-1) to executor with priority 1 and queue default
[2022-06-07 09:54:42,477] {base_executor.py:91} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'lowi17', '華視新聞網', 'manual__2022-06-06T06:00:03.787848+00:00', '--local', '--subdir', '/Users/wilbertung/Documents/lowitest/airflow/dags/DAG_lowi50.py']
[2022-06-07 09:54:42,621] {scheduler_job.py:599} INFO - Executor reports execution of lowi17.台灣醒報 run_id=manual__2022-06-06T06:00:03.787848+00:00 exited with status failed for try_number 3
[2022-06-07 09:54:42,621] {scheduler_job.py:599} INFO - Executor reports execution of lowi17.台灣新生報 run_id=manual__2022-06-06T06:00:03.787848+00:00 exited with status failed for try_number 3
[2022-06-07 09:54:42,621] {scheduler_job.py:599} INFO - Executor reports execution of lowi17.華視新聞網 run_id=manual__2022-06-06T06:00:03.787848+00:00 exited with status failed for try_number 3
[2022-06-07 09:54:42,626] {scheduler_job.py:643} INFO - TaskInstance Finished: dag_id=lowi17, task_id=台灣醒報, run_id=manual__2022-06-06T06:00:03.787848+00:00, map_index=-1, run_start_date=2022-06-06 06:00:06.678844+00:00, run_end_date=2022-06-06 06:51:33.138733+00:00, run_duration=3086.459889, state=queued, executor_state=failed, try_number=3, max_tries=2, job_id=83, pool=default_pool, queue=default, priority_weight=1, operator=BashOperator, queued_dttm=2022-06-07 01:54:42.474017+00:00, queued_by_job_id=100, pid=31538
[2022-06-07 09:54:42,627] {scheduler_job.py:672} ERROR - Executor reports task instance <TaskInstance: lowi17.台灣醒報 manual__2022-06-06T06:00:03.787848+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
[2022-06-07 09:54:42,639] {scheduler_job.py:643} INFO - TaskInstance Finished: dag_id=lowi17, task_id=台灣新生報, run_id=manual__2022-06-06T06:00:03.787848+00:00, map_index=-1, run_start_date=2022-06-06 06:00:06.005933+00:00, run_end_date=2022-06-06 06:51:33.156305+00:00, run_duration=3087.150372, state=queued, executor_state=failed, try_number=3, max_tries=2, job_id=85, pool=default_pool, queue=default, priority_weight=1, operator=BashOperator, queued_dttm=2022-06-07 01:54:42.474017+00:00, queued_by_job_id=100, pid=31535
[2022-06-07 09:54:42,639] {scheduler_job.py:672} ERROR - Executor reports task instance <TaskInstance: lowi17.台灣新生報 manual__2022-06-06T06:00:03.787848+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
[2022-06-07 09:54:42,645] {scheduler_job.py:643} INFO - TaskInstance Finished: dag_id=lowi17, task_id=華視新聞網, run_id=manual__2022-06-06T06:00:03.787848+00:00, map_index=-1, run_start_date=None, run_end_date=2022-06-06 06:51:33.162201+00:00, run_duration=None, state=queued, executor_state=failed, try_number=3, max_tries=2, job_id=None, pool=default_pool, queue=default, priority_weight=1, operator=BashOperator, queued_dttm=2022-06-07 01:54:42.474017+00:00, queued_by_job_id=100, pid=None
[2022-06-07 09:54:42,645] {scheduler_job.py:672} ERROR - Executor reports task instance <TaskInstance: lowi17.華視新聞網 manual__2022-06-06T06:00:03.787848+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
[2022-06-07 09:54:42,672] {dagrun.py:547} ERROR - Marking run <DagRun lowi17 @ 2022-06-06 06:00:03.787848+00:00: manual__2022-06-06T06:00:03.787848+00:00, externally triggered: True> failed
[2022-06-07 09:54:42,672] {dagrun.py:607} INFO - DagRun Finished: dag_id=lowi17, execution_date=2022-06-06 06:00:03.787848+00:00, run_id=manual__2022-06-06T06:00:03.787848+00:00, run_start_date=2022-06-06 06:00:03.844994+00:00, run_end_date=2022-06-07 01:54:42.672853+00:00, run_duration=71678.827859, state=failed, external_trigger=True, run_type=manual, data_interval_start=2022-06-05 06:00:03.787848+00:00, data_interval_end=2022-06-06 06:00:03.787848+00:00, dag_hash=7f2d9c074e59bc29ace385f688864720
[2022-06-07 09:54:42,675] {dag.py:2923} INFO - Setting next_dagrun for lowi17 to 2022-06-06T06:00:03.787848+00:00, run_after=2022-06-07T06:00:03.787848+00:00

After this moment, the DAG becomes invisible on the webserver as if it never existed... I am sure I am missing some important configuration of some sort. If so, which one?


Solution

  • Basically, even if there's a way to put the DAG files into different absolute but same RELATIVE folder and make it work, the most common and direct method that I went with was to mount a shared folder to both the main node and the remote worker so that they can both access the same DAG folder.

    More details about it can be found here: https://github.com/apache/airflow/discussions/24275