Search code examples
airflowairflow-2.xairflow-taskflow

Airflow - Task flow, Dynamic Task Mapping with multiple output


Let's say I have an Airflow (2.3) DAG that looks like this:

@task
def retrieve_ingest_setup_queries():
    settings = Variable.get("athena_settings", deserialize_json=True)
    # settings = {'drops': ["DROP TABLE my_table", "DROP TABLE my_table2"],
    #             'creates': ["CREATE TABLE ...", ...]}
    return settings

@dag(
    dag_id='athena_something',
    default_args=default_args,
    schedule_interval=None,
    render_template_as_native_obj=True,
)
def somedag():
    ingest_setup = retrieve_ingest_setup_queries()

    ingest_db_setup_drops = AthenaOperator.partial(
        task_id='db_drops',
        database="{{ var.json.athena.database }}",
        output_location="{{ var.json.athena.output_location }}",
        aws_conn_id='aws_athena'
    ).expand(query=ingest_setup??????)

    ingest_db_setup_creates = AthenaOperator.partial(
        task_id='db_creates',
        database="{{ var.json.athena.database }}",
        output_location="{{ var.json.athena.output_location }}",
        aws_conn_id='aws_athena'
    ).expand(query=ingest_setup??????)

I am looking for a way to set "query" in the expand method as ingest_setup['drops'] for my first operator and as ingest_setup['creates'] for the second.

I could use two different retrieval functions, but I'd like to use only one. I want to use taskflow if at all possible. Expand doesn't support templating, so I don't see how/if this can be done (see ?????? in the code).


Solution

  • I needed to use multiple_outputs=True for the task decorator.
    Then ingest_setup['creates'] works as intended.

    This only works with task decorators though, accessing the key of a dictionary that's an operator's result (XComArg) is far from intuitive. It is discussed here.

    From there, I have created the following class for operator results:

    class XcomDict:
        def __init__(self, operator: Operator):
            self.operator_output = str(operator.output).strip("{ }")
    
        def __getitem__(self, item: str):
            return f"{{{{ {self.operator_output}['{item}'] }}}}"
    

    Of course, it assumes the operator's return value is a dictionary. I use it this way:

        job = CreateJobOperator(
            task_id='create_job', ...)
    
        wait = WaiForJobOperator(
            task_id='wait_for_job_to_complete',
            job_id=XcomDict(job)['JobId'],..)