I'm new at using Google Cloud Composer (Apache Airflow) and I'm trying to create a DAG that runs a script to INSERT some values to a BigQuery table (in a different project). DAGS directory is set up at gcs/dags/. BQ table is already created with correct fields and data types.
Whenever it's attempting to do the insert_to_bq
task, it always returns an error:
google.api_core.exceptions.BadRequest: 400 Braced constructors are not supported
Here's the code that I'm running:
...
SQL_INSERT = f"INSERT {BQ_TABLE_ID} ({BQ_INSERT_COLS}) VALUES('{VAR_1}', TIMESTAMP({{ts}}), '{VAR_3}')"
# DAG
with models.DAG(
'dag_name',
default_args = DEFAULT_ARGS,
schedule_interval = '@once'
) as dag:
...
insert_to_bq = BigQueryOperator(
task_id = "insert_to_bq_task",
write_disposition = 'WRITE_APPEND',
create_disposition = 'CREATE_NEVER',
sql = SQL_INSERT,
use_legacy_sql = False,
dag = dag
...
I've already tried using another operator but produces the same error:
...
insert_to_bq = BigQueryInsertJobOperator(
task_id = "insert_to_bq_task"
configuration = {
"query" : {
"query" : SQL_INSERT,
"useLegacySql" : False,
"destinationTable" : {
"projectId" : "project-id",
"datasetId" : "dataset_id",
"tableId" : "table_id"
},
"writeDisposition" : "WRITE_APPEND",
}
},
location = "my_location"
)
I've already searched for the error code it's giving but cannot find any informative descriptions on why it's happening. Currently stuck at this and I deeply appreciate on any tips, advice, or anything that can help me resolve this issue. Let me know if there is anything else that I need to provide to further understand and troubleshoot the issue.
UPDATE:
Resolved by adding a parameter called params
in BigQueryOperator which consists of a dictionary for the values you want to dynamically put into variables.
SQL_INSERT = f'''
INSERT {{ params.table_id }} {{ params.columns }}
VALUES('{{ params.var_string }}', TIMESTAMP({{ params.timestamp }}))
'''
...
insert_to_bq = BigQueryOperator(
task_id="insert_to_bq_task",
write_disposition='WRITE_APPEND',
create_disposition='CREATE_NEVER',
sql=SQL_INSERT,
use_legacy_sql=False,
params={
'table_id':'bq_table_id',
'columns':'bq_columns',
'var_string':'string_value',
'timestamp':datetime.now(),
}
)
...
I think your problem is not due to your use of Operators BigQueryOperator
or BigQueryInsertJobOperator
, but to the way you build your SQL query.
Check this link that indicate the same kind of error when executing an SQL
query with BigQuery
.
Please try to execute your code with a simple query without dynamic parameters then find the good way to add these parameter later.