Search code examples
pythonkubernetesairflowpickledill

PythonVirtualenvOperator gives error: No module named unusual_prefix_***_dag


I'm using airflow 2.5.3 with Kubernetes executor and Python 3.7.

I've tried to make a simple DAG with only one PythonVirtualnvOperator and two context variables ({{ ts }} and {{ dag }}) passed into it.

from datetime import timedelta
from pathlib import Path
import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator, PythonVirtualenvOperator
import pendulum
 
 
dag = DAG(
    default_args={
        'retries': 2,
        'retry_delay': timedelta(minutes=10),
    },
    dag_id='fs_rb_cashflow_test5',
    schedule_interval='0 5 * * 1',
    start_date=pendulum.datetime(2020, 1, 1, tz='UTC'),
    catchup=False,
    tags=['Feature Store', 'RB', 'u_m1ahn'],
    render_template_as_native_obj=True,
)
 
context = {"ts": "{{ ts }}", "dag": "{{ dag }}"}
op_args = [context, Path(__file__).parent.absolute()]
 
 
def make_foo(*args, **kwargs):
    print("---> making foo!")
    print("make foo(...): args")
    print(args)
    print("make foo(...): kwargs")
    print(kwargs)
 
 
make_foo_task = PythonVirtualenvOperator(
        task_id='make_foo',
        python_callable=make_foo,
        provide_context=True,
        use_dill=True,
        system_site_packages=False,
        op_args=op_args,
        op_kwargs={
          "execution_date_str": '{{ execution_date }}',
        },
        requirements=["dill", "pytz", f"apache-airflow=={airflow.__version__}", "psycopg2-binary >= 2.9, < 3"],
        dag=dag)

Alas, when I'm trying to trigger this DAG, airflow gives me the following error:

[2023-10-23, 13:30:40] {process_utils.py:187} INFO - Traceback (most recent call last):
[2023-10-23, 13:30:40] {process_utils.py:187} INFO -   File "/tmp/venv5ifve2a5/script.py", line 17, in <module>
[2023-10-23, 13:30:40] {process_utils.py:187} INFO -     arg_dict = dill.load(file)
[2023-10-23, 13:30:40] {process_utils.py:187} INFO -   File "/tmp/venv5ifve2a5/lib/python3.7/site-packages/dill/_dill.py", line 287, in load
[2023-10-23, 13:30:40] {process_utils.py:187} INFO -     return Unpickler(file, ignore=ignore, **kwds).load()
[2023-10-23, 13:30:40] {process_utils.py:187} INFO -   File "/tmp/venv5ifve2a5/lib/python3.7/site-packages/dill/_dill.py", line 442, in load
[2023-10-23, 13:30:40] {process_utils.py:187} INFO -     obj = StockUnpickler.load(self)
[2023-10-23, 13:30:40] {process_utils.py:187} INFO -   File "/tmp/venv5ifve2a5/lib/python3.7/site-packages/dill/_dill.py", line 432, in find_class
[2023-10-23, 13:30:40] {process_utils.py:187} INFO -     return StockUnpickler.find_class(self, module, name)
[2023-10-23, 13:30:40] {process_utils.py:187} INFO - ModuleNotFoundError: No module named 'unusual_prefix_4c3a45107010a4223aa054ffc5f7bffc78cce4e7_dag'

Why does it give me this strange error -- and how can it be fixed?


Solution

  • This problem occurs if this function make_foo that is passed to an airflow operator as python_callable argument is defined in the same Python source with the DAG object.

    And the DAG has finally started working when I moved the make_foo function to another Python module.

    Here's my code now:

    • dags/strange_pickling_error/dag.py:
    import datetime
    import pendulum
    import airflow
    from airflow import DAG
    from airflow.operators.python import PythonOperator, PythonVirtualenvOperator
    import dill
    
    from strange_pickling_error.some_moar_code import make_foo
    
    
    dag = DAG(
        dag_id='strange_pickling_error_dag',
        schedule_interval='0 5 * * 1',
        start_date=datetime.datetime(2020, 1, 1),
        catchup=False,
        render_template_as_native_obj=True,
    )
    
    
    context = {"ts": "{{ ts }}", "dag_run": "{{ dag_run }}"}
    
    
    make_foo_task = PythonVirtualenvOperator(
        task_id='make_foo',
        python_callable=make_foo,
        use_dill=True,
        system_site_packages=False,
        op_args=[context],
        requirements=[f"dill=={dill.__version__}", f"apache-airflow=={airflow.__version__}", "psycopg2-binary >= 2.9, < 3",
                      f"pendulum=={pendulum.__version__}", "lazy-object-proxy"],
        dag=dag)
    
    • dags/strange_pickling_error/some_moar_code.py:
    def make_foo(*args, **kwargs):
        print("---> making foo!")
        print("make foo(...): args")
        print(args)
        print("make foo(...): kwargs")
        print(kwargs)