Search code examples
airflowgoogle-cloud-composer

variables by default in airflow and BigQueryToGCSOperator


I have an airflow DAG with the following task. At runtime, there is an error because there are special characters in the job_id. How can I correctly pass the variables to the task definition?

date='{{ ds }}'
job_id=date+'{{ ts }}'
inst_dag_id="name_dag"
DAG(dag_id=inst_dag_id,
          default_args = default_args,
          catchup = False,  
          max_active_runs = 6,
          schedule_interval = scheduler_reports[report] ) as dag: #HORA UTC-0

      export_audits_to_gcs = bigquery_to_gcs.BigQueryToGCSOperator(
                task_id='export_audits_to_gcs',
                gcp_conn_id='google_cloud_default',
                compression="GZIP",
                source_project_dataset_table=bq_destination_table_name,
                destination_cloud_storage_uris=[output_file],
                field_delimiter=";",
                export_format='CSV',
                job_id=inst_dag_id+job_id
            )

the mistake is : google.api_core.exceptions.BadRequest: 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/name_proyect/jobs?prettyPrint=false: Invalid job ID "name_dag{{ ds }}{{ ts }}_6c6c293aa2fa1a63fadeaecefe7ff58c". Job IDs must be alphanumeric (plus underscores and dashes) and must be at most 1024 characters long.


Solution

  • I raised PR to address this.

    For apache-airflow-providers-google>8.11.0:

    Your code will work as the PR included in the release.

    For apache-airflow-providers-google<=8.11.0:

    This happens because job_id is not in template_fields.

    If you can not update provider to newer version you can create custom operator to address it:

    class MyBigQueryToGCSOperator(BigQueryToGCSOperator):
    
        template_fields = (
            "job_id",
        ) + BigQueryToGCSOperator.template_fields
    

    Then replace the class in your code:

      export_audits_to_gcs = MyBigQueryToGCSOperator(
                task_id='export_audits_to_gcs',
                gcp_conn_id='google_cloud_default',
                compression="GZIP",
                source_project_dataset_table=bq_destination_table_name,
                destination_cloud_storage_uris=[output_file],
                field_delimiter=";",
                export_format='CSV',
                job_id=inst_dag_id+job_id
            )