Search code examples
airflowairflow-taskflow

"ValueError: non-default argument follows default argument" when upgrading from Airflow 2.2.2 to 2.8.1


Have been doing an upgrade from Airflow 2.2.2 to 2.8.1. All of my DAGs parse after doing relatively minor modifications, except for one.

Using the TaskFlow API, I have a task with this structure:

@task(multiple_outputs=True)
def decorated_task(
    inputs: list[dict],
    logical_date: str,
    project_id: str,
    bucket_name: str,
) -> dict[str, str]:
    # Implementation details
    return {}

Being called like this in the DAG:

@dag(
    dag_id=dag_id,
    # ...
    default_args={
        "depends_on_past": False,
        "retries": 3,
        "retry_delay": timedelta(hours=1),
    },
)
def decorated_dag():
    # ...
    task_date = "{{ (dag_run.logical_date - macros.timedelta(days=1)) | ds_nodash }}"
    # ...
    decorated_task_result = decorated_task(
        inputs=inputs,
        logical_date=task_date,
        project_id=PROJECT_ID,
        bucket_name=BUCKET_NAME,
    )

I'm getting this error in the scheduler:

ValueError: non-default argument follows default argument

with trace:


  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/dag.py", line 3944, in factory
    f(**f_kwargs)
  File "/usr/local/airflow/dags/dag_file.py", line 160, in decorated_dag
    decorated_task_result = decorated_task(
                            ^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/decorators/base.py", line 366, in __call__
    op = self.operator_class(
         ^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 437, in apply_defaults
    result = func(self, **kwargs, default_args=default_args)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/decorators/python.py", line 52, in __init__
    super().__init__(
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 437, in apply_defaults
    result = func(self, **kwargs, default_args=default_args)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/decorators/base.py", line 217, in __init__
    signature = signature.replace(parameters=parameters)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/inspect.py", line 3052, in replace
    return type(self)(parameters,
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/inspect.py", line 3008, in __init__
    raise ValueError(msg)
ValueError: non-default argument follows default argument

It's clearly something internal to Airflow as suggested by the stack trace. I couldn't find anything similar on the Internet nor the GitHub Issues of Airflow.


Solution

  • As the stack trace says, the error comes from this code in the DecoratedOperator class in decorators/base.py:

    # Check the decorated function's signature. We go through the argument
    # list and "fill in" defaults to arguments that are known context keys,
    # since values for those will be provided when the task is run. Since
    # we're not actually running the function, None is good enough here.
    signature = inspect.signature(python_callable)
    parameters = [
        param.replace(default=None) if param.name in KNOWN_CONTEXT_KEYS else param
        for param in signature.parameters.values()
    ]
    signature = signature.replace(parameters=parameters)
    

    The KNOWN_CONTEXT_KEYS variable is a long set that represents the Airflow context variables:

    KNOWN_CONTEXT_KEYS = {
        "conf",
        # ...
        "logical_date",
        # ...
        
    }
    

    The logical_date context variable is used in our task function signature. From here we can deduce that putting that parameter name at the end of the decorated_task function parameter list fixes the issue.

    @task(multiple_outputs=True)
    def decorated_task(
        inputs: list[dict],
        project_id: str,
        bucket_name: str,
        logical_date: str,
    ) -> dict[str, str]:
        # Implementation details
        return {}
    

    Update

    I made a PR to Airflow to improve the error message on these cases so hopefully it's clearer starting a later minor version of 2.8, or 2.9.

    https://github.com/apache/airflow/pull/38015