Search code examples
pythongoogle-cloud-platformairflowdataform

How do I invoke a Dataform workflow with tags using Airflow via GCP Cloud Composer?


I'm invoking a Dataform workflow in a python script running in Airflow (2.2.5) via GCP Cloud Composer (1.20.2).

It invokes the workflow run successfully, until I add the 'included_Tags' parameter (as detailed in the final code block here: https://cloud.google.com/dataform/docs/schedule-executions-composer).

My code for the compilation and workflow invocation is as follows:

create_compilation_result = DataformCreateCompilationResultOperator(
    task_id="create_compilation_result",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    compilation_result={
        "git_commitish": GIT_COMMITISH,
    }
)

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id='create_workflow_invocation',
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation={
        "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
        "invocation_config": { "included_Tags": ["dcm_raw"], "transitive_dependencies_included": true }
    },
    #start_date=days_ago(1),        
    timeout=None,
)

Running the above in my Airflow DAG, I receive the following error in the Airflow log for the 'create_workflow_invocation' task:

ValueError: Protocol message InvocationConfig has no "includedTags" field.


Solution

  • In "invocation_config", just rename "included_Tags" as "included_tags" and this will fix the error in above code.