Have been doing an upgrade from Airflow 2.2.2 to 2.8.1. All of my DAGs parse after doing relatively minor modifications, except for one.
Using the TaskFlow API, I have a task with this structure:
@task(multiple_outputs=True)
def decorated_task(
inputs: list[dict],
logical_date: str,
project_id: str,
bucket_name: str,
) -> dict[str, str]:
# Implementation details
return {}
Being called like this in the DAG:
@dag(
dag_id=dag_id,
# ...
default_args={
"depends_on_past": False,
"retries": 3,
"retry_delay": timedelta(hours=1),
},
)
def decorated_dag():
# ...
task_date = "{{ (dag_run.logical_date - macros.timedelta(days=1)) | ds_nodash }}"
# ...
decorated_task_result = decorated_task(
inputs=inputs,
logical_date=task_date,
project_id=PROJECT_ID,
bucket_name=BUCKET_NAME,
)
I'm getting this error in the scheduler:
ValueError: non-default argument follows default argument
with trace:
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/dag.py", line 3944, in factory
f(**f_kwargs)
File "/usr/local/airflow/dags/dag_file.py", line 160, in decorated_dag
decorated_task_result = decorated_task(
^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/decorators/base.py", line 366, in __call__
op = self.operator_class(
^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 437, in apply_defaults
result = func(self, **kwargs, default_args=default_args)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/decorators/python.py", line 52, in __init__
super().__init__(
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 437, in apply_defaults
result = func(self, **kwargs, default_args=default_args)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/decorators/base.py", line 217, in __init__
signature = signature.replace(parameters=parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/inspect.py", line 3052, in replace
return type(self)(parameters,
^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/inspect.py", line 3008, in __init__
raise ValueError(msg)
ValueError: non-default argument follows default argument
It's clearly something internal to Airflow as suggested by the stack trace. I couldn't find anything similar on the Internet nor the GitHub Issues of Airflow.
As the stack trace says, the error comes from this code in the DecoratedOperator
class in decorators/base.py
:
# Check the decorated function's signature. We go through the argument
# list and "fill in" defaults to arguments that are known context keys,
# since values for those will be provided when the task is run. Since
# we're not actually running the function, None is good enough here.
signature = inspect.signature(python_callable)
parameters = [
param.replace(default=None) if param.name in KNOWN_CONTEXT_KEYS else param
for param in signature.parameters.values()
]
signature = signature.replace(parameters=parameters)
The KNOWN_CONTEXT_KEYS
variable is a long set that represents the Airflow context variables:
KNOWN_CONTEXT_KEYS = {
"conf",
# ...
"logical_date",
# ...
}
The logical_date
context variable is used in our task function signature. From here we can deduce that putting that parameter name at the end of the decorated_task
function parameter list fixes the issue.
@task(multiple_outputs=True)
def decorated_task(
inputs: list[dict],
project_id: str,
bucket_name: str,
logical_date: str,
) -> dict[str, str]:
# Implementation details
return {}
I made a PR to Airflow to improve the error message on these cases so hopefully it's clearer starting a later minor version of 2.8, or 2.9.