Search code examples
python-3.xdirected-acyclic-graphsgoogle-cloud-composerairflow-2.x

400 Bad Request error attempting to insert values into Google BigQuery


I'm new at using Google Cloud Composer (Apache Airflow) and I'm trying to create a DAG that runs a script to INSERT some values to a BigQuery table (in a different project). DAGS directory is set up at gcs/dags/. BQ table is already created with correct fields and data types.

Whenever it's attempting to do the insert_to_bq task, it always returns an error: google.api_core.exceptions.BadRequest: 400 Braced constructors are not supported

Here's the code that I'm running:

...
SQL_INSERT = f"INSERT {BQ_TABLE_ID} ({BQ_INSERT_COLS}) VALUES('{VAR_1}', TIMESTAMP({{ts}}), '{VAR_3}')"

# DAG
with models.DAG(
    'dag_name',
    default_args = DEFAULT_ARGS,
    schedule_interval = '@once'
) as dag:
    ...
    insert_to_bq = BigQueryOperator(
        task_id = "insert_to_bq_task",
        write_disposition = 'WRITE_APPEND',
        create_disposition = 'CREATE_NEVER',
        sql = SQL_INSERT,
        use_legacy_sql = False,
        dag = dag
    ...

I've already tried using another operator but produces the same error:

...
insert_to_bq = BigQueryInsertJobOperator(
    task_id = "insert_to_bq_task"
    configuration = {
        "query" : {
            "query" : SQL_INSERT,
            "useLegacySql" : False,
            "destinationTable" : {
                "projectId" : "project-id",
                "datasetId" : "dataset_id",
                "tableId" : "table_id"
            },
            "writeDisposition" : "WRITE_APPEND",
        }
    },
    location = "my_location"
)

I've already searched for the error code it's giving but cannot find any informative descriptions on why it's happening. Currently stuck at this and I deeply appreciate on any tips, advice, or anything that can help me resolve this issue. Let me know if there is anything else that I need to provide to further understand and troubleshoot the issue.


UPDATE:

Resolved by adding a parameter called params in BigQueryOperator which consists of a dictionary for the values you want to dynamically put into variables.

SQL_INSERT = f'''
INSERT {{ params.table_id }} {{ params.columns }}
VALUES('{{ params.var_string }}', TIMESTAMP({{ params.timestamp }}))
'''
...
insert_to_bq = BigQueryOperator(
    task_id="insert_to_bq_task",
    write_disposition='WRITE_APPEND',
    create_disposition='CREATE_NEVER',
    sql=SQL_INSERT,
    use_legacy_sql=False,
    params={
        'table_id':'bq_table_id',
        'columns':'bq_columns',
        'var_string':'string_value',
        'timestamp':datetime.now(),
    }
)
...

Solution

  • I think your problem is not due to your use of Operators BigQueryOperator or BigQueryInsertJobOperator, but to the way you build your SQL query.

    Check this link that indicate the same kind of error when executing an SQL query with BigQuery.

    Please try to execute your code with a simple query without dynamic parameters then find the good way to add these parameter later.