from airflow.decorators import dag, task
from pendulum import datetime
from datetime import timedelta
my_default_args = {
'owner': 'Anonymus',
# 'email': ['random@random.com'],
# 'email_on_failure': True,
# 'email_on_retry': False, #only allow if it was allowed in the scheduler
# 'retries': 1, #only allow if it was allowed in the scheduler
# 'retry_delay': timedelta(minutes=1),
# 'depends_on_past': False,
}
@dag(
dag_id='test_global_variable',
schedule='12 11 * * *',
start_date=datetime(2023,2,1,tz="UTC"),
catchup=False,
default_args=my_default_args,
tags=['sample_tag', 'sample_tag2'],
)
def write_var():
@task.external_python(task_id="task_1", python='/opt/airflow/v1/bin/python3')
def add_to_list(my_list):
print(my_list)
import pandas as pd
df = pd.DataFrame(my_list)
return df
@task.external_python(task_id="task_2", python='/opt/airflow/v1/bin/python3')
def add_to_list_2(df):
print(df)
df = df.append([19])
return df
add_to_list_2(add_to_list([23, 5, 8]))
write_var()
*** Reading local file: /opt/airflow/logs/dag_id=test_global_variable/run_id=manual__2023-02-07T14:06:17.432734+00:00/task_id=task_2/attempt=1.log
[2023-02-07, 14:06:22 GMT] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_global_variable.task_2 manual__2023-02-07T14:06:17.432734+00:00 [queued]>
[2023-02-07, 14:06:22 GMT] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_global_variable.task_2 manual__2023-02-07T14:06:17.432734+00:00 [queued]>
[2023-02-07, 14:06:22 GMT] {taskinstance.py:1362} INFO -
--------------------------------------------------------------------------------
[2023-02-07, 14:06:22 GMT] {taskinstance.py:1363} INFO - Starting attempt 1 of 1
[2023-02-07, 14:06:22 GMT] {taskinstance.py:1364} INFO -
--------------------------------------------------------------------------------
[2023-02-07, 14:06:22 GMT] {taskinstance.py:1383} INFO - Executing <Task(_PythonExternalDecoratedOperator): task_2> on 2023-02-07 14:06:17.432734+00:00
[2023-02-07, 14:06:22 GMT] {standard_task_runner.py:54} INFO - Started process 324831 to run task
[2023-02-07, 14:06:22 GMT] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'test_global_variable', 'task_2', 'manual__2023-02-07T14:06:17.432734+00:00', '--job-id', '74080', '--raw', '--subdir', 'DAGS_FOLDER/test_global_variable.py', '--cfg-path', '/tmp/tmpbm8tkk1i']
[2023-02-07, 14:06:22 GMT] {standard_task_runner.py:83} INFO - Job 74080: Subtask task_2
[2023-02-07, 14:06:22 GMT] {dagbag.py:525} INFO - Filling up the DagBag from /opt/airflow/dags/test_global_variable.py
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {task_command.py:384} INFO - Running <TaskInstance: test_global_variable.task_2 manual__2023-02-07T14:06:17.432734+00:00 [running]> on host 4851b30aa5cf
[2023-02-07, 14:06:22 GMT] {taskinstance.py:1590} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=Anonymus
AIRFLOW_CTX_DAG_ID=test_global_variable
AIRFLOW_CTX_TASK_ID=task_2
AIRFLOW_CTX_EXECUTION_DATE=2023-02-07T14:06:17.432734+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2023-02-07T14:06:17.432734+00:00
[2023-02-07, 14:06:23 GMT] {process_utils.py:179} INFO - Executing cmd: /opt/airflow/venv1/bin/python3 /tmp/tmddiox599m/script.py /tmp/tmddiox599m/script.in /tmp/tmddiox599m/script.out /tmp/tmddiox599m/string_args.txt
[2023-02-07, 14:06:23 GMT] {process_utils.py:183} INFO - Output:
[2023-02-07, 14:06:24 GMT] {process_utils.py:187} INFO - Traceback (most recent call last):
[2023-02-07, 14:06:24 GMT] {process_utils.py:187} INFO - File "/tmp/tmddiox599m/script.py", line 17, in <module>
[2023-02-07, 14:06:24 GMT] {process_utils.py:187} INFO - arg_dict = pickle.load(file)
[2023-02-07, 14:06:24 GMT] {process_utils.py:187} INFO - AttributeError: Can't get attribute '_unpickle_block' on <module 'pandas._libs.internals' from '/opt/airflow/venv1/lib/python3.8/site-packages/pandas/_libs/internals.cpython-38-x86_64-linux-gnu.so'>
[2023-02-07, 14:06:24 GMT] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/decorators/base.py", line 188, in execute
return_value = super().execute(context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 370, in execute
return super().execute(context=serializable_context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 175, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 678, in execute_callable
return self._execute_python_callable_in_subprocess(python_path, tmp_path)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 426, in _execute_python_callable_in_subprocess
execute_in_subprocess(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/process_utils.py", line 168, in execute_in_subprocess
execute_in_subprocess_with_kwargs(cmd, cwd=cwd)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/process_utils.py", line 191, in execute_in_subprocess_with_kwargs
raise subprocess.CalledProcessError(exit_code, cmd)
subprocess.CalledProcessError: Command '['/opt/airflow/venv1/bin/python3', '/tmp/tmddiox599m/script.py', '/tmp/tmddiox599m/script.in', '/tmp/tmddiox599m/script.out', '/tmp/tmddiox599m/string_args.txt']' returned non-zero exit status 1.
[2023-02-07, 14:06:24 GMT] {taskinstance.py:1401} INFO - Marking task as FAILED. dag_id=test_global_variable, task_id=task_2, execution_date=20230207T140617, start_date=20230207T140622, end_date=20230207T140624
[2023-02-07, 14:06:24 GMT] {standard_task_runner.py:102} ERROR - Failed to execute job 74080 for task task_2 (Command '['/opt/airflow/venv1/bin/python3', '/tmp/tmddiox599m/script.py', '/tmp/tmddiox599m/script.in', '/tmp/tmddiox599m/script.out', '/tmp/tmddiox599m/string_args.txt']' returned non-zero exit status 1.; 324831)
[2023-02-07, 14:06:24 GMT] {local_task_job.py:164} INFO - Task exited with return code 1
[2023-02-07, 14:06:24 GMT] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check
Try to use to_dict
to serialize your dataframe as dict and recreate it from the other task:
@task.external_python(task_id="task_1", python='/opt/airflow/v1/bin/python3')
def add_to_list(my_list):
print(my_list)
import pandas as pd
df = pd.DataFrame(my_list)
return df.to_dict('split') # <- HERE
@task.external_python(task_id="task_2", python='/opt/airflow/v1/bin/python3')
def add_to_list_2(df):
df = pd.DataFrame(**df) # <- HERE
print(df)
df = df.append([19])
return df
From the Airflow documentation of ExternalPythonOperator (and VirtualPythonOperator) about serialization.
Your python callable has to be serializable. There are a number of python objects that are not serializable using standard
pickle
library. You can mitigate some of those limitations by usingdill
library but even that library does not solve all the serialization limitations.