from __future__ import annotations
import logging
import os
import shutil
import sys
import tempfile
import time
from pprint import pprint
import pendulum
from airflow import DAG
from airflow.decorators import task
log = logging.getLogger(__name__)
PYTHON = sys.executable
BASE_DIR = tempfile.gettempdir()
my_default_args = {
'owner': 'me',
'email': ['some_email@some_email.com'],
'email_on_failure': True,
'email_on_retry': False,
'write_successes': [],
}
with DAG(
dag_id='my_dag_id',
schedule='9 9 * * *',
start_date=pendulum.datetime(2022, 1, 1, tz="UTC"),
catchup=False,
default_args=my_default_args,
tags=['a', 'b'],
) as dag:
@task.external_python(task_id="one", python='/opt/airflow/venv1/bin/python3')
def first(**kwargs):
task_id="one"
write_successes = kwargs.get('write_successes', [])
print(write_successes)
write_successes.append(99)
print(write_successes)
@task.external_python(task_id="two", python='/opt/airflow/venv1/bin/python3')
def second(**kwargs):
write_successes = kwargs.get('write_successes', [])
print(write_successes)
write_successes.append(101)
print(write_successes)
one = first()
two = second()
one >> two
*** Reading local file: /opt/airflow/logs/dag_id=test_global_variable/run_id=scheduled__2023-02-05T09:09:00+00:00/task_id=one/attempt=1.log
[2023-02-06, 12:24:43 GMT] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_global_variable.one scheduled__2023-02-05T09:09:00+00:00 [queued]>
[2023-02-06, 12:24:43 GMT] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_global_variable.one scheduled__2023-02-05T09:09:00+00:00 [queued]>
[2023-02-06, 12:24:43 GMT] {taskinstance.py:1362} INFO -
--------------------------------------------------------------------------------
[2023-02-06, 12:24:43 GMT] {taskinstance.py:1363} INFO - Starting attempt 1 of 1
[2023-02-06, 12:24:43 GMT] {taskinstance.py:1364} INFO -
--------------------------------------------------------------------------------
[2023-02-06, 12:24:43 GMT] {taskinstance.py:1383} INFO - Executing <Task(_PythonExternalDecoratedOperator): one> on 2023-02-05 09:09:00+00:00
[2023-02-06, 12:24:43 GMT] {standard_task_runner.py:54} INFO - Started process 239657 to run task
[2023-02-06, 12:24:43 GMT] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'test_global_variable', 'one', 'scheduled__2023-02-05T09:09:00+00:00', '--job-id', '72751', '--raw', '--subdir', 'DAGS_FOLDER/test_global_variable.py', '--cfg-path', '/tmp/tmpxldmrzpp']
[2023-02-06, 12:24:43 GMT] {standard_task_runner.py:83} INFO - Job 72751: Subtask one
[2023-02-06, 12:24:43 GMT] {dagbag.py:525} INFO - Filling up the DagBag from /opt/airflow/dags/test_global_variable.py
[2023-02-06, 12:24:43 GMT] {task_command.py:384} INFO - Running <TaskInstance: test_global_variable.one scheduled__2023-02-05T09:09:00+00:00 [running]> on host 4851b30aa5cf
[2023-02-06, 12:24:43 GMT] {taskinstance.py:1590} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=me
AIRFLOW_CTX_DAG_ID=test_global_variable
AIRFLOW_CTX_TASK_ID=one
AIRFLOW_CTX_EXECUTION_DATE=2023-02-05T09:09:00+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2023-02-05T09:09:00+00:00
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'execution_date' from the template is deprecated and will be removed in a future version. Please use 'data_interval_start' or 'logical_date' instead.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'next_ds' from the template is deprecated and will be removed in a future version. Please use '{{ data_interval_end | ds }}' instead.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'next_ds_nodash' from the template is deprecated and will be removed in a future version. Please use '{{ data_interval_end | ds_nodash }}' instead.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'next_execution_date' from the template is deprecated and will be removed in a future version. Please use 'data_interval_end' instead.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'prev_ds' from the template is deprecated and will be removed in a future version.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'prev_ds_nodash' from the template is deprecated and will be removed in a future version.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'prev_execution_date' from the template is deprecated and will be removed in a future version.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'prev_execution_date_success' from the template is deprecated and will be removed in a future version. Please use 'prev_data_interval_start_success' instead.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'tomorrow_ds' from the template is deprecated and will be removed in a future version.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'tomorrow_ds_nodash' from the template is deprecated and will be removed in a future version.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'yesterday_ds' from the template is deprecated and will be removed in a future version.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'yesterday_ds_nodash' from the template is deprecated and will be removed in a future version.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/decorators/base.py", line 188, in execute
return_value = super().execute(context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 370, in execute
return super().execute(context=serializable_context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 175, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 678, in execute_callable
return self._execute_python_callable_in_subprocess(python_path, tmp_path)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 411, in _execute_python_callable_in_subprocess
self._write_args(input_path)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 381, in _write_args
file.write_bytes(self.pickling_library.dumps({'args': self.op_args, 'kwargs': self.op_kwargs}))
_pickle.PicklingError: Can't pickle <function first at 0x7f80ff76e4c0>: it's not the same object as unusual_prefix_6cc7442bed7c02593e3a29524b0e65329d9f59da_test_global_variable.first
[2023-02-06, 12:24:44 GMT] {taskinstance.py:1401} INFO - Marking task as FAILED. dag_id=test_global_variable, task_id=one, execution_date=20230205T090900, start_date=20230206T122443, end_date=20230206T122444
[2023-02-06, 12:24:44 GMT] {standard_task_runner.py:102} ERROR - Failed to execute job 72751 for task one (Can't pickle <function first at 0x7f80ff76e4c0>: it's not the same object as unusual_prefix_6cc7442bed7c02593e3a29524b0e65329d9f59da_test_global_variable.first; 239657)
[2023-02-06, 12:24:44 GMT] {local_task_job.py:164} INFO - Task exited with return code 1
[2023-02-06, 12:24:44 GMT] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check
I have tried global python variables that did not worked at all
Global variables in Airflow - i have separate "task.external_python" that makes it not possible to use the following post.
Mine is not a class issue - List as global variable inside a class in Python
might be interesting but I have separate python venve for each task - https://stackoverflow.com/a/58804409/10270590
I could not get Airflow XCOM working
@TJaniF -> (I have retried this a 2nd time than it have worked but on the 1st run with the same code I got the following results:) I have tried the following code the long top bar is marked as Failed but a single square bellow marked as success but then there was no square below that square at all. I don't understand this
from airflow.decorators import dag, task
from pendulum import datetime
@dag(
dag_id='test_global_variable',
start_date=datetime(2022,12,10),
schedule=None,
catchup=False,)
def write_var():
@task.external_python(task_id="task_1", python='/opt/airflow/venv1/bin/python3')
def add_to_list(my_list):
print(my_list)
my_list.append(19)
return my_list
@task.external_python(task_id="task_2", python='/opt/airflow/venv1/bin/python3')
def add_to_list_2(my_list):
print(my_list)
my_list.append(42)
return my_list
add_to_list_2(add_to_list([23, 5, 8]))
write_var()
LOG From the succesful task
[2023-02-06, 15:36:52 GMT] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_global_variable.task_1 manual__2023-02-06T15:36:51.225176+00:00 [queued]>
[2023-02-06, 15:36:52 GMT] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_global_variable.task_1 manual__2023-02-06T15:36:51.225176+00:00 [queued]>
[2023-02-06, 15:36:52 GMT] {taskinstance.py:1362} INFO -
--------------------------------------------------------------------------------
[2023-02-06, 15:36:52 GMT] {taskinstance.py:1363} INFO - Starting attempt 1 of 1
[2023-02-06, 15:36:52 GMT] {taskinstance.py:1364} INFO -
--------------------------------------------------------------------------------
[2023-02-06, 15:36:52 GMT] {taskinstance.py:1383} INFO - Executing <Task(_PythonExternalDecoratedOperator): task_1> on 2023-02-06 15:36:51.225176+00:00
[2023-02-06, 15:36:52 GMT] {standard_task_runner.py:54} INFO - Started process 249785 to run task
[2023-02-06, 15:36:52 GMT] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'test_global_variable', 'task_1', 'manual__2023-02-06T15:36:51.225176+00:00', '--job-id', '72908', '--raw', '--subdir', 'DAGS_FOLDER/test_global_variable.py', '--cfg-path', '/tmp/tmpuw6bfiif']
[2023-02-06, 15:36:52 GMT] {standard_task_runner.py:83} INFO - Job 72908: Subtask task_1
[2023-02-06, 15:36:52 GMT] {dagbag.py:525} INFO - Filling up the DagBag from /opt/airflow/dags/test_global_variable.py
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {task_command.py:384} INFO - Running <TaskInstance: test_global_variable.task_1 manual__2023-02-06T15:36:51.225176+00:00 [running]> on host 4851b30aa5cf
[2023-02-06, 15:36:52 GMT] {taskinstance.py:1590} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_global_variable
AIRFLOW_CTX_TASK_ID=task_1
AIRFLOW_CTX_EXECUTION_DATE=2023-02-06T15:36:51.225176+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2023-02-06T15:36:51.225176+00:00
[2023-02-06, 15:36:53 GMT] {process_utils.py:179} INFO - Executing cmd: /opt/airflow/venv1/bin/python3 /tmp/tmd35abbbcv/script.py /tmp/tmd35abbbcv/script.in /tmp/tmd35abbbcv/script.out /tmp/tmd35abbbcv/string_args.txt
[2023-02-06, 15:36:53 GMT] {process_utils.py:183} INFO - Output:
[2023-02-06, 15:36:54 GMT] {process_utils.py:187} INFO - [23, 5, 8]
[2023-02-06, 15:36:54 GMT] {python.py:177} INFO - Done. Returned value was: [23, 5, 8, 19]
[2023-02-06, 15:36:54 GMT] {taskinstance.py:1401} INFO - Marking task as SUCCESS. dag_id=test_global_variable, task_id=task_1, execution_date=20230206T153651, start_date=20230206T153652, end_date=20230206T153654
[2023-02-06, 15:36:54 GMT] {local_task_job.py:164} INFO - Task exited with return code 0
[2023-02-06, 15:36:54 GMT] {local_task_job.py:273} INFO - 1 downstream tasks scheduled from follow-on schedule check
I'm curious what your tried for Airflow XCom? The following DAG passes a list from one task to another using XCom via the TaskFlow API. Tested for Airflow 2.5.1, but it should work the same with 2.4.1.
from airflow.decorators import dag, task
from pendulum import datetime
@dag(
start_date=datetime(2022,12,10),
schedule=None,
catchup=False,
)
def write_var():
@task.external_python(
task_id="task_1",
python='/home/astro/.pyenv/versions/my_env/bin/python'
)
def add_to_list(my_list):
print(my_list)
my_list.append(19)
return my_list
@task.external_python(
task_id="task_2",
python='/home/astro/.pyenv/versions/my_env/bin/python'
)
def add_to_list_2(my_list):
print(my_list)
my_list.append(42)
return my_list
add_to_list_2(add_to_list([23, 5, 8]))
write_var()
Screenshot: