Search code examples
pythonpython-3.xpandasairflowdirected-acyclic-graphs

How to use a python list as global variable pandas data frame with in @task.external_python?


Goal

CODE

from airflow.decorators import dag, task
from pendulum import datetime
from datetime import timedelta


my_default_args = {
    'owner': 'Anonymus',
    # 'email': ['random@random.com'],
    # 'email_on_failure': True,
    # 'email_on_retry': False, #only allow if it was allowed in the scheduler
    # 'retries': 1, #only allow if it was allowed in the scheduler
    # 'retry_delay': timedelta(minutes=1),
    # 'depends_on_past': False,
}

@dag(
    dag_id='test_global_variable',
    schedule='12 11 * * *',
    start_date=datetime(2023,2,1,tz="UTC"),
    catchup=False,
    default_args=my_default_args,
    tags=['sample_tag', 'sample_tag2'],
    )
def write_var():

    @task.external_python(task_id="task_1", python='/opt/airflow/v1/bin/python3')
    def add_to_list(my_list):
        print(my_list)
        import pandas as pd
        df = pd.DataFrame(my_list)
        return df


    @task.external_python(task_id="task_2", python='/opt/airflow/v1/bin/python3')
    def add_to_list_2(df):
        print(df)
        df = df.append([19])
        return df

    add_to_list_2(add_to_list([23, 5, 8]))


write_var()

ERROR LOG of 2nd task

*** Reading local file: /opt/airflow/logs/dag_id=test_global_variable/run_id=manual__2023-02-07T14:06:17.432734+00:00/task_id=task_2/attempt=1.log
[2023-02-07, 14:06:22 GMT] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_global_variable.task_2 manual__2023-02-07T14:06:17.432734+00:00 [queued]>
[2023-02-07, 14:06:22 GMT] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_global_variable.task_2 manual__2023-02-07T14:06:17.432734+00:00 [queued]>
[2023-02-07, 14:06:22 GMT] {taskinstance.py:1362} INFO - 
--------------------------------------------------------------------------------
[2023-02-07, 14:06:22 GMT] {taskinstance.py:1363} INFO - Starting attempt 1 of 1
[2023-02-07, 14:06:22 GMT] {taskinstance.py:1364} INFO - 
--------------------------------------------------------------------------------
[2023-02-07, 14:06:22 GMT] {taskinstance.py:1383} INFO - Executing <Task(_PythonExternalDecoratedOperator): task_2> on 2023-02-07 14:06:17.432734+00:00
[2023-02-07, 14:06:22 GMT] {standard_task_runner.py:54} INFO - Started process 324831 to run task
[2023-02-07, 14:06:22 GMT] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'test_global_variable', 'task_2', 'manual__2023-02-07T14:06:17.432734+00:00', '--job-id', '74080', '--raw', '--subdir', 'DAGS_FOLDER/test_global_variable.py', '--cfg-path', '/tmp/tmpbm8tkk1i']
[2023-02-07, 14:06:22 GMT] {standard_task_runner.py:83} INFO - Job 74080: Subtask task_2
[2023-02-07, 14:06:22 GMT] {dagbag.py:525} INFO - Filling up the DagBag from /opt/airflow/dags/test_global_variable.py
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {task_command.py:384} INFO - Running <TaskInstance: test_global_variable.task_2 manual__2023-02-07T14:06:17.432734+00:00 [running]> on host 4851b30aa5cf
[2023-02-07, 14:06:22 GMT] {taskinstance.py:1590} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=Anonymus
AIRFLOW_CTX_DAG_ID=test_global_variable
AIRFLOW_CTX_TASK_ID=task_2
AIRFLOW_CTX_EXECUTION_DATE=2023-02-07T14:06:17.432734+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2023-02-07T14:06:17.432734+00:00
[2023-02-07, 14:06:23 GMT] {process_utils.py:179} INFO - Executing cmd: /opt/airflow/venv1/bin/python3 /tmp/tmddiox599m/script.py /tmp/tmddiox599m/script.in /tmp/tmddiox599m/script.out /tmp/tmddiox599m/string_args.txt
[2023-02-07, 14:06:23 GMT] {process_utils.py:183} INFO - Output:
[2023-02-07, 14:06:24 GMT] {process_utils.py:187} INFO - Traceback (most recent call last):
[2023-02-07, 14:06:24 GMT] {process_utils.py:187} INFO -   File "/tmp/tmddiox599m/script.py", line 17, in <module>
[2023-02-07, 14:06:24 GMT] {process_utils.py:187} INFO -     arg_dict = pickle.load(file)
[2023-02-07, 14:06:24 GMT] {process_utils.py:187} INFO - AttributeError: Can't get attribute '_unpickle_block' on <module 'pandas._libs.internals' from '/opt/airflow/venv1/lib/python3.8/site-packages/pandas/_libs/internals.cpython-38-x86_64-linux-gnu.so'>
[2023-02-07, 14:06:24 GMT] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/decorators/base.py", line 188, in execute
    return_value = super().execute(context)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 370, in execute
    return super().execute(context=serializable_context)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 175, in execute
    return_value = self.execute_callable()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 678, in execute_callable
    return self._execute_python_callable_in_subprocess(python_path, tmp_path)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 426, in _execute_python_callable_in_subprocess
    execute_in_subprocess(
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/process_utils.py", line 168, in execute_in_subprocess
    execute_in_subprocess_with_kwargs(cmd, cwd=cwd)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/process_utils.py", line 191, in execute_in_subprocess_with_kwargs
    raise subprocess.CalledProcessError(exit_code, cmd)
subprocess.CalledProcessError: Command '['/opt/airflow/venv1/bin/python3', '/tmp/tmddiox599m/script.py', '/tmp/tmddiox599m/script.in', '/tmp/tmddiox599m/script.out', '/tmp/tmddiox599m/string_args.txt']' returned non-zero exit status 1.
[2023-02-07, 14:06:24 GMT] {taskinstance.py:1401} INFO - Marking task as FAILED. dag_id=test_global_variable, task_id=task_2, execution_date=20230207T140617, start_date=20230207T140622, end_date=20230207T140624
[2023-02-07, 14:06:24 GMT] {standard_task_runner.py:102} ERROR - Failed to execute job 74080 for task task_2 (Command '['/opt/airflow/venv1/bin/python3', '/tmp/tmddiox599m/script.py', '/tmp/tmddiox599m/script.in', '/tmp/tmddiox599m/script.out', '/tmp/tmddiox599m/string_args.txt']' returned non-zero exit status 1.; 324831)
[2023-02-07, 14:06:24 GMT] {local_task_job.py:164} INFO - Task exited with return code 1
[2023-02-07, 14:06:24 GMT] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check

Solution

  • Try to use to_dict to serialize your dataframe as dict and recreate it from the other task:

    @task.external_python(task_id="task_1", python='/opt/airflow/v1/bin/python3')
    def add_to_list(my_list):
        print(my_list)
        import pandas as pd
        df = pd.DataFrame(my_list)
        return df.to_dict('split')  # <- HERE
    
    
    @task.external_python(task_id="task_2", python='/opt/airflow/v1/bin/python3')
    def add_to_list_2(df):
        df = pd.DataFrame(**df)  # <- HERE
        print(df)
        df = df.append([19])
        return df
    

    From the Airflow documentation of ExternalPythonOperator (and VirtualPythonOperator) about serialization.

    Your python callable has to be serializable. There are a number of python objects that are not serializable using standard pickle library. You can mitigate some of those limitations by using dill library but even that library does not solve all the serialization limitations.