Search code examples
pythonpython-3.xairflowdirected-acyclic-graphsairflow-2.x

How to use a python list as global variable python list with in @task.external_python?


GOAL:

  • Have a python list as a global variable between tasks.
  • Currently it crashes at the 1st task.
  • 1.) I am trying to have a simple python list that is carried from 1 task to the next and append a few string values to it at task 2. So the goal is to have 1 shared list.
  • 2.) Even if 1 task fails it should just move on ad dotn care (obviously mark the task area failed)

SETUP:

  • I am on Airflow 2.4.1
  • I use Airflow Docker and build a python environemnt that I have used many times and just works fine.

MY CODE:

from __future__ import annotations
import logging
import os
import shutil
import sys
import tempfile
import time
from pprint import pprint
import pendulum
from airflow import DAG
from airflow.decorators import task

log = logging.getLogger(__name__)
PYTHON = sys.executable
BASE_DIR = tempfile.gettempdir()

my_default_args = {
    'owner': 'me',
    'email': ['some_email@some_email.com'],
    'email_on_failure': True,
    'email_on_retry': False, 
    'write_successes': [],
}

with DAG(
    dag_id='my_dag_id',
    schedule='9 9 * * *',
    start_date=pendulum.datetime(2022, 1, 1, tz="UTC"),
    catchup=False,
    default_args=my_default_args,
    tags=['a', 'b'],
    ) as dag:

    @task.external_python(task_id="one", python='/opt/airflow/venv1/bin/python3')
    def first(**kwargs):
        task_id="one"
        write_successes = kwargs.get('write_successes', [])

        print(write_successes)
        write_successes.append(99)
        print(write_successes)


    @task.external_python(task_id="two", python='/opt/airflow/venv1/bin/python3')
    def second(**kwargs):
        write_successes = kwargs.get('write_successes', [])

        print(write_successes)
        write_successes.append(101)
        print(write_successes)


    one = first()
    two = second()

    one >> two

ERROR:

*** Reading local file: /opt/airflow/logs/dag_id=test_global_variable/run_id=scheduled__2023-02-05T09:09:00+00:00/task_id=one/attempt=1.log
[2023-02-06, 12:24:43 GMT] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_global_variable.one scheduled__2023-02-05T09:09:00+00:00 [queued]>
[2023-02-06, 12:24:43 GMT] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_global_variable.one scheduled__2023-02-05T09:09:00+00:00 [queued]>
[2023-02-06, 12:24:43 GMT] {taskinstance.py:1362} INFO - 
--------------------------------------------------------------------------------
[2023-02-06, 12:24:43 GMT] {taskinstance.py:1363} INFO - Starting attempt 1 of 1
[2023-02-06, 12:24:43 GMT] {taskinstance.py:1364} INFO - 
--------------------------------------------------------------------------------
[2023-02-06, 12:24:43 GMT] {taskinstance.py:1383} INFO - Executing <Task(_PythonExternalDecoratedOperator): one> on 2023-02-05 09:09:00+00:00
[2023-02-06, 12:24:43 GMT] {standard_task_runner.py:54} INFO - Started process 239657 to run task
[2023-02-06, 12:24:43 GMT] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'test_global_variable', 'one', 'scheduled__2023-02-05T09:09:00+00:00', '--job-id', '72751', '--raw', '--subdir', 'DAGS_FOLDER/test_global_variable.py', '--cfg-path', '/tmp/tmpxldmrzpp']
[2023-02-06, 12:24:43 GMT] {standard_task_runner.py:83} INFO - Job 72751: Subtask one
[2023-02-06, 12:24:43 GMT] {dagbag.py:525} INFO - Filling up the DagBag from /opt/airflow/dags/test_global_variable.py
[2023-02-06, 12:24:43 GMT] {task_command.py:384} INFO - Running <TaskInstance: test_global_variable.one scheduled__2023-02-05T09:09:00+00:00 [running]> on host 4851b30aa5cf
[2023-02-06, 12:24:43 GMT] {taskinstance.py:1590} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=me
AIRFLOW_CTX_DAG_ID=test_global_variable
AIRFLOW_CTX_TASK_ID=one
AIRFLOW_CTX_EXECUTION_DATE=2023-02-05T09:09:00+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2023-02-05T09:09:00+00:00
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'execution_date' from the template is deprecated and will be removed in a future version. Please use 'data_interval_start' or 'logical_date' instead.
  warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))

[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'next_ds' from the template is deprecated and will be removed in a future version. Please use '{{ data_interval_end | ds }}' instead.
  warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))

[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'next_ds_nodash' from the template is deprecated and will be removed in a future version. Please use '{{ data_interval_end | ds_nodash }}' instead.
  warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))

[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'next_execution_date' from the template is deprecated and will be removed in a future version. Please use 'data_interval_end' instead.
  warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))

[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'prev_ds' from the template is deprecated and will be removed in a future version.
  warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))

[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'prev_ds_nodash' from the template is deprecated and will be removed in a future version.
  warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))

[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'prev_execution_date' from the template is deprecated and will be removed in a future version.
  warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))

[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'prev_execution_date_success' from the template is deprecated and will be removed in a future version. Please use 'prev_data_interval_start_success' instead.
  warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))

[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'tomorrow_ds' from the template is deprecated and will be removed in a future version.
  warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))

[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'tomorrow_ds_nodash' from the template is deprecated and will be removed in a future version.
  warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))

[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'yesterday_ds' from the template is deprecated and will be removed in a future version.
  warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))

[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'yesterday_ds_nodash' from the template is deprecated and will be removed in a future version.
  warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))

[2023-02-06, 12:24:44 GMT] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/decorators/base.py", line 188, in execute
    return_value = super().execute(context)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 370, in execute
    return super().execute(context=serializable_context)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 175, in execute
    return_value = self.execute_callable()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 678, in execute_callable
    return self._execute_python_callable_in_subprocess(python_path, tmp_path)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 411, in _execute_python_callable_in_subprocess
    self._write_args(input_path)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 381, in _write_args
    file.write_bytes(self.pickling_library.dumps({'args': self.op_args, 'kwargs': self.op_kwargs}))
_pickle.PicklingError: Can't pickle <function first at 0x7f80ff76e4c0>: it's not the same object as unusual_prefix_6cc7442bed7c02593e3a29524b0e65329d9f59da_test_global_variable.first
[2023-02-06, 12:24:44 GMT] {taskinstance.py:1401} INFO - Marking task as FAILED. dag_id=test_global_variable, task_id=one, execution_date=20230205T090900, start_date=20230206T122443, end_date=20230206T122444
[2023-02-06, 12:24:44 GMT] {standard_task_runner.py:102} ERROR - Failed to execute job 72751 for task one (Can't pickle <function first at 0x7f80ff76e4c0>: it's not the same object as unusual_prefix_6cc7442bed7c02593e3a29524b0e65329d9f59da_test_global_variable.first; 239657)
[2023-02-06, 12:24:44 GMT] {local_task_job.py:164} INFO - Task exited with return code 1
[2023-02-06, 12:24:44 GMT] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check

I have tried to fix it based on the following posts:

  • I have tried global python variables that did not worked at all

  • Global variables in Airflow - i have separate "task.external_python" that makes it not possible to use the following post.

  • Mine is not a class issue - List as global variable inside a class in Python

  • might be interesting but I have separate python venve for each task - https://stackoverflow.com/a/58804409/10270590

  • I could not get Airflow XCOM working

  • @TJaniF -> (I have retried this a 2nd time than it have worked but on the 1st run with the same code I got the following results:) I have tried the following code the long top bar is marked as Failed but a single square bellow marked as success but then there was no square below that square at all. I don't understand this

from airflow.decorators import dag, task
from pendulum import datetime


@dag(
    dag_id='test_global_variable',
    start_date=datetime(2022,12,10),
    schedule=None,
    catchup=False,)
def write_var():

    @task.external_python(task_id="task_1", python='/opt/airflow/venv1/bin/python3')
    def add_to_list(my_list):
        print(my_list)
        my_list.append(19)
        return my_list


    @task.external_python(task_id="task_2", python='/opt/airflow/venv1/bin/python3')
    def add_to_list_2(my_list):
        print(my_list)
        my_list.append(42)
        return my_list

    add_to_list_2(add_to_list([23, 5, 8]))


write_var()

LOG From the succesful task

[2023-02-06, 15:36:52 GMT] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_global_variable.task_1 manual__2023-02-06T15:36:51.225176+00:00 [queued]>
[2023-02-06, 15:36:52 GMT] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_global_variable.task_1 manual__2023-02-06T15:36:51.225176+00:00 [queued]>
[2023-02-06, 15:36:52 GMT] {taskinstance.py:1362} INFO - 
--------------------------------------------------------------------------------
[2023-02-06, 15:36:52 GMT] {taskinstance.py:1363} INFO - Starting attempt 1 of 1
[2023-02-06, 15:36:52 GMT] {taskinstance.py:1364} INFO - 
--------------------------------------------------------------------------------
[2023-02-06, 15:36:52 GMT] {taskinstance.py:1383} INFO - Executing <Task(_PythonExternalDecoratedOperator): task_1> on 2023-02-06 15:36:51.225176+00:00
[2023-02-06, 15:36:52 GMT] {standard_task_runner.py:54} INFO - Started process 249785 to run task
[2023-02-06, 15:36:52 GMT] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'test_global_variable', 'task_1', 'manual__2023-02-06T15:36:51.225176+00:00', '--job-id', '72908', '--raw', '--subdir', 'DAGS_FOLDER/test_global_variable.py', '--cfg-path', '/tmp/tmpuw6bfiif']
[2023-02-06, 15:36:52 GMT] {standard_task_runner.py:83} INFO - Job 72908: Subtask task_1
[2023-02-06, 15:36:52 GMT] {dagbag.py:525} INFO - Filling up the DagBag from /opt/airflow/dags/test_global_variable.py
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {task_command.py:384} INFO - Running <TaskInstance: test_global_variable.task_1 manual__2023-02-06T15:36:51.225176+00:00 [running]> on host 4851b30aa5cf
[2023-02-06, 15:36:52 GMT] {taskinstance.py:1590} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_global_variable
AIRFLOW_CTX_TASK_ID=task_1
AIRFLOW_CTX_EXECUTION_DATE=2023-02-06T15:36:51.225176+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2023-02-06T15:36:51.225176+00:00
[2023-02-06, 15:36:53 GMT] {process_utils.py:179} INFO - Executing cmd: /opt/airflow/venv1/bin/python3 /tmp/tmd35abbbcv/script.py /tmp/tmd35abbbcv/script.in /tmp/tmd35abbbcv/script.out /tmp/tmd35abbbcv/string_args.txt
[2023-02-06, 15:36:53 GMT] {process_utils.py:183} INFO - Output:
[2023-02-06, 15:36:54 GMT] {process_utils.py:187} INFO - [23, 5, 8]
[2023-02-06, 15:36:54 GMT] {python.py:177} INFO - Done. Returned value was: [23, 5, 8, 19]
[2023-02-06, 15:36:54 GMT] {taskinstance.py:1401} INFO - Marking task as SUCCESS. dag_id=test_global_variable, task_id=task_1, execution_date=20230206T153651, start_date=20230206T153652, end_date=20230206T153654
[2023-02-06, 15:36:54 GMT] {local_task_job.py:164} INFO - Task exited with return code 0
[2023-02-06, 15:36:54 GMT] {local_task_job.py:273} INFO - 1 downstream tasks scheduled from follow-on schedule check

Screenshot:

Issue


Solution

  • I'm curious what your tried for Airflow XCom? The following DAG passes a list from one task to another using XCom via the TaskFlow API. Tested for Airflow 2.5.1, but it should work the same with 2.4.1.

    from airflow.decorators import dag, task
    from pendulum import datetime
    
    
    @dag(
        start_date=datetime(2022,12,10),
        schedule=None,
        catchup=False,
    )
    def write_var():
    
        @task.external_python(
            task_id="task_1",
            python='/home/astro/.pyenv/versions/my_env/bin/python'
        )
        def add_to_list(my_list):
            print(my_list)
            my_list.append(19)
            return my_list
    
    
        @task.external_python(
            task_id="task_2",
            python='/home/astro/.pyenv/versions/my_env/bin/python'
        )
        def add_to_list_2(my_list):
            print(my_list)
            my_list.append(42)
            return my_list
    
        add_to_list_2(add_to_list([23, 5, 8]))
    
    
    write_var()
    

    Screenshot:

    Grid view