I'm using airflow 2.5.3 with Kubernetes executor and Python 3.7.
I've tried to make a simple DAG with only one PythonVirtualnvOperator
and two context variables ({{ ts }}
and {{ dag }}
) passed into it.
from datetime import timedelta
from pathlib import Path
import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator, PythonVirtualenvOperator
import pendulum
dag = DAG(
default_args={
'retries': 2,
'retry_delay': timedelta(minutes=10),
},
dag_id='fs_rb_cashflow_test5',
schedule_interval='0 5 * * 1',
start_date=pendulum.datetime(2020, 1, 1, tz='UTC'),
catchup=False,
tags=['Feature Store', 'RB', 'u_m1ahn'],
render_template_as_native_obj=True,
)
context = {"ts": "{{ ts }}", "dag": "{{ dag }}"}
op_args = [context, Path(__file__).parent.absolute()]
def make_foo(*args, **kwargs):
print("---> making foo!")
print("make foo(...): args")
print(args)
print("make foo(...): kwargs")
print(kwargs)
make_foo_task = PythonVirtualenvOperator(
task_id='make_foo',
python_callable=make_foo,
provide_context=True,
use_dill=True,
system_site_packages=False,
op_args=op_args,
op_kwargs={
"execution_date_str": '{{ execution_date }}',
},
requirements=["dill", "pytz", f"apache-airflow=={airflow.__version__}", "psycopg2-binary >= 2.9, < 3"],
dag=dag)
Alas, when I'm trying to trigger this DAG, airflow gives me the following error:
[2023-10-23, 13:30:40] {process_utils.py:187} INFO - Traceback (most recent call last):
[2023-10-23, 13:30:40] {process_utils.py:187} INFO - File "/tmp/venv5ifve2a5/script.py", line 17, in <module>
[2023-10-23, 13:30:40] {process_utils.py:187} INFO - arg_dict = dill.load(file)
[2023-10-23, 13:30:40] {process_utils.py:187} INFO - File "/tmp/venv5ifve2a5/lib/python3.7/site-packages/dill/_dill.py", line 287, in load
[2023-10-23, 13:30:40] {process_utils.py:187} INFO - return Unpickler(file, ignore=ignore, **kwds).load()
[2023-10-23, 13:30:40] {process_utils.py:187} INFO - File "/tmp/venv5ifve2a5/lib/python3.7/site-packages/dill/_dill.py", line 442, in load
[2023-10-23, 13:30:40] {process_utils.py:187} INFO - obj = StockUnpickler.load(self)
[2023-10-23, 13:30:40] {process_utils.py:187} INFO - File "/tmp/venv5ifve2a5/lib/python3.7/site-packages/dill/_dill.py", line 432, in find_class
[2023-10-23, 13:30:40] {process_utils.py:187} INFO - return StockUnpickler.find_class(self, module, name)
[2023-10-23, 13:30:40] {process_utils.py:187} INFO - ModuleNotFoundError: No module named 'unusual_prefix_4c3a45107010a4223aa054ffc5f7bffc78cce4e7_dag'
Why does it give me this strange error -- and how can it be fixed?
This problem occurs if this function make_foo
that is passed to an airflow operator as python_callable
argument is defined in the same Python source with the DAG object.
And the DAG has finally started working when I moved the make_foo
function to another Python module.
Here's my code now:
dags/strange_pickling_error/dag.py
:import datetime
import pendulum
import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator, PythonVirtualenvOperator
import dill
from strange_pickling_error.some_moar_code import make_foo
dag = DAG(
dag_id='strange_pickling_error_dag',
schedule_interval='0 5 * * 1',
start_date=datetime.datetime(2020, 1, 1),
catchup=False,
render_template_as_native_obj=True,
)
context = {"ts": "{{ ts }}", "dag_run": "{{ dag_run }}"}
make_foo_task = PythonVirtualenvOperator(
task_id='make_foo',
python_callable=make_foo,
use_dill=True,
system_site_packages=False,
op_args=[context],
requirements=[f"dill=={dill.__version__}", f"apache-airflow=={airflow.__version__}", "psycopg2-binary >= 2.9, < 3",
f"pendulum=={pendulum.__version__}", "lazy-object-proxy"],
dag=dag)
dags/strange_pickling_error/some_moar_code.py
:def make_foo(*args, **kwargs):
print("---> making foo!")
print("make foo(...): args")
print(args)
print("make foo(...): kwargs")
print(kwargs)