Search code examples
airflowairflow-2.x

Can you resolve a connection id for an operator based on parameters set when triggering a DAG in Airflow?


I have a DAG that transfers files from an FTP server to an Azure Blob. I would like to resolve the connections based on parameters when the DAG is triggered. For example, let's say there are two different FTP servers and there is a connection for each already configured. I want to allow the user triggering the DAG run to specify which connection to use when they trigger it.

with DAG(
    "example_ftp_to_blob",
    default_args=default_args,
    schedule=None,
    catchup=False,
) as dag:
    ftp_get = FTPFileTransmitOperator(
        task_id="ftp_get",
        ftp_conn_id="my-ftp-con",
        local_filepath="/tmp/my-file",
        remote_filepath="/my-file",
        operation=FTPOperation.GET,
        create_intermediate_dirs=True,
    )

    blob_put = LocalFilesystemToWasbOperator(
        task_id="blob_put",
        wasb_conn_id="my_blob_conn",
        file_path="/tmp/my-file",
        container_name="incoming",
        blob_name="my-file",
    )

    ftp_get >> blob_put

Since the params aren't known when the DAG is parsed, I'm not clear on how (or even if) I can pass those values to the operators.


Solution

  • You normally use templates to evaluate parameters at runtime.

    ftp_conn_id field of FTPFileTransmitOperator is not templated. Fortunately it's easy to extend it - the docs have actually an example explaining just this: how to add a templated field to existing operator:

    class BetterFTPFileTransmitOperator(FTPFileTransmitOperator):
        template_fields: Sequence[str] = (*FTPFileTransmitOperator.template_fields, "ftp_conn_id")
    

    Now we are ready to parametrize the connection:

    with DAG(
        "example_ftp_to_blob",
        default_args=default_args,
        schedule=None,
        catchup=False,
        params={ "ftp_conn_id": Param("default_ftp_conn", type="string") }
    ) as dag:
        ftp_get = BetterFTPFileTransmitOperator(
            task_id="ftp_get",
            ftp_conn_id="{{ params.ftp_conn_id }}",
            local_filepath="/tmp/my-file",
            remote_filepath="/my-file",
            operation=FTPOperation.GET,
            create_intermediate_dirs=True,
        )
        ...