from airflow.decorators import dag, task
from pendulum import datetime
from datetime import timedelta
my_default_args = {
'owner': 'Anonymus',
# 'email': [''],
# 'email_on_failure': True,
# 'email_on_retry': False, #only allow if it was allowed in the scheduler
# 'retries': 1, #only allow if it was allowed in the scheduler
# 'retry_delay': timedelta(minutes=1),
# 'depends_on_past': False,
schedule='12 11 * * *',
tags=['sample_tag', 'sample_tag2'],
def write_var():
@task.external_python(task_id="task_1", python='/opt/airflow/v1/bin/python3')
def add_to_list(my_list):
import pandas as pd
df = pd.DataFrame(my_list)
return df
@task.external_python(task_id="task_2", python='/opt/airflow/v1/bin/python3')
def add_to_list_2(df):
df = df.append([19])
return df
add_to_list_2(add_to_list([23, 5, 8]))
*** Reading local file: /opt/airflow/logs/dag_id=test_global_variable/run_id=manual__2023-02-07T14:06:17.432734+00:00/task_id=task_2/attempt=1.log
[2023-02-07, 14:06:22 GMT] {} INFO - Dependencies all met for <TaskInstance: test_global_variable.task_2 manual__2023-02-07T14:06:17.432734+00:00 [queued]>
[2023-02-07, 14:06:22 GMT] {} INFO - Dependencies all met for <TaskInstance: test_global_variable.task_2 manual__2023-02-07T14:06:17.432734+00:00 [queued]>
[2023-02-07, 14:06:22 GMT] {} INFO -
[2023-02-07, 14:06:22 GMT] {} INFO - Starting attempt 1 of 1
[2023-02-07, 14:06:22 GMT] {} INFO -
[2023-02-07, 14:06:22 GMT] {} INFO - Executing <Task(_PythonExternalDecoratedOperator): task_2> on 2023-02-07 14:06:17.432734+00:00
[2023-02-07, 14:06:22 GMT] {} INFO - Started process 324831 to run task
[2023-02-07, 14:06:22 GMT] {} INFO - Running: ['airflow', 'tasks', 'run', 'test_global_variable', 'task_2', 'manual__2023-02-07T14:06:17.432734+00:00', '--job-id', '74080', '--raw', '--subdir', 'DAGS_FOLDER/', '--cfg-path', '/tmp/tmpbm8tkk1i']
[2023-02-07, 14:06:22 GMT] {} INFO - Job 74080: Subtask task_2
[2023-02-07, 14:06:22 GMT] {} INFO - Filling up the DagBag from /opt/airflow/dags/
[2023-02-07, 14:06:22 GMT] {} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {} INFO - Running <TaskInstance: test_global_variable.task_2 manual__2023-02-07T14:06:17.432734+00:00 [running]> on host 4851b30aa5cf
[2023-02-07, 14:06:22 GMT] {} INFO - Exporting the following env vars:
[2023-02-07, 14:06:23 GMT] {} INFO - Executing cmd: /opt/airflow/venv1/bin/python3 /tmp/tmddiox599m/ /tmp/tmddiox599m/ /tmp/tmddiox599m/script.out /tmp/tmddiox599m/string_args.txt
[2023-02-07, 14:06:23 GMT] {} INFO - Output:
[2023-02-07, 14:06:24 GMT] {} INFO - Traceback (most recent call last):
[2023-02-07, 14:06:24 GMT] {} INFO - File "/tmp/tmddiox599m/", line 17, in <module>
[2023-02-07, 14:06:24 GMT] {} INFO - arg_dict = pickle.load(file)
[2023-02-07, 14:06:24 GMT] {} INFO - AttributeError: Can't get attribute '_unpickle_block' on <module 'pandas._libs.internals' from '/opt/airflow/venv1/lib/python3.8/site-packages/pandas/_libs/'>
[2023-02-07, 14:06:24 GMT] {} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/decorators/", line 188, in execute
return_value = super().execute(context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/", line 370, in execute
return super().execute(context=serializable_context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/", line 175, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/", line 678, in execute_callable
return self._execute_python_callable_in_subprocess(python_path, tmp_path)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/", line 426, in _execute_python_callable_in_subprocess
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/", line 168, in execute_in_subprocess
execute_in_subprocess_with_kwargs(cmd, cwd=cwd)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/", line 191, in execute_in_subprocess_with_kwargs
raise subprocess.CalledProcessError(exit_code, cmd)
subprocess.CalledProcessError: Command '['/opt/airflow/venv1/bin/python3', '/tmp/tmddiox599m/', '/tmp/tmddiox599m/', '/tmp/tmddiox599m/script.out', '/tmp/tmddiox599m/string_args.txt']' returned non-zero exit status 1.
[2023-02-07, 14:06:24 GMT] {} INFO - Marking task as FAILED. dag_id=test_global_variable, task_id=task_2, execution_date=20230207T140617, start_date=20230207T140622, end_date=20230207T140624
[2023-02-07, 14:06:24 GMT] {} ERROR - Failed to execute job 74080 for task task_2 (Command '['/opt/airflow/venv1/bin/python3', '/tmp/tmddiox599m/', '/tmp/tmddiox599m/', '/tmp/tmddiox599m/script.out', '/tmp/tmddiox599m/string_args.txt']' returned non-zero exit status 1.; 324831)
[2023-02-07, 14:06:24 GMT] {} INFO - Task exited with return code 1
[2023-02-07, 14:06:24 GMT] {} INFO - 0 downstream tasks scheduled from follow-on schedule check
Try to use to_dict
to serialize your dataframe as dict and recreate it from the other task:
@task.external_python(task_id="task_1", python='/opt/airflow/v1/bin/python3')
def add_to_list(my_list):
import pandas as pd
df = pd.DataFrame(my_list)
return df.to_dict('split') # <- HERE
@task.external_python(task_id="task_2", python='/opt/airflow/v1/bin/python3')
def add_to_list_2(df):
df = pd.DataFrame(**df) # <- HERE
df = df.append([19])
return df
From the Airflow documentation of ExternalPythonOperator (and VirtualPythonOperator) about serialization.
Your python callable has to be serializable. There are a number of python objects that are not serializable using standard
library. You can mitigate some of those limitations by usingdill
library but even that library does not solve all the serialization limitations.