I have an airflow DAG with the following task. At runtime, there is an error because there are special characters in the job_id. How can I correctly pass the variables to the task definition?
date='{{ ds }}'
job_id=date+'{{ ts }}'
inst_dag_id="name_dag"
DAG(dag_id=inst_dag_id,
default_args = default_args,
catchup = False,
max_active_runs = 6,
schedule_interval = scheduler_reports[report] ) as dag: #HORA UTC-0
export_audits_to_gcs = bigquery_to_gcs.BigQueryToGCSOperator(
task_id='export_audits_to_gcs',
gcp_conn_id='google_cloud_default',
compression="GZIP",
source_project_dataset_table=bq_destination_table_name,
destination_cloud_storage_uris=[output_file],
field_delimiter=";",
export_format='CSV',
job_id=inst_dag_id+job_id
)
the mistake is : google.api_core.exceptions.BadRequest: 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/name_proyect/jobs?prettyPrint=false: Invalid job ID "name_dag{{ ds }}{{ ts }}_6c6c293aa2fa1a63fadeaecefe7ff58c". Job IDs must be alphanumeric (plus underscores and dashes) and must be at most 1024 characters long.
I raised PR to address this.
For apache-airflow-providers-google>8.11.0
:
Your code will work as the PR included in the release.
For apache-airflow-providers-google<=8.11.0
:
This happens because job_id
is not in template_fields.
If you can not update provider to newer version you can create custom operator to address it:
class MyBigQueryToGCSOperator(BigQueryToGCSOperator):
template_fields = (
"job_id",
) + BigQueryToGCSOperator.template_fields
Then replace the class in your code:
export_audits_to_gcs = MyBigQueryToGCSOperator(
task_id='export_audits_to_gcs',
gcp_conn_id='google_cloud_default',
compression="GZIP",
source_project_dataset_table=bq_destination_table_name,
destination_cloud_storage_uris=[output_file],
field_delimiter=";",
export_format='CSV',
job_id=inst_dag_id+job_id
)