I am attempting to use the BigQueryCreateEmptyTableOperator
Airflow operator to check for the existence of a BigQuery table and to create the table with a given schema if it does not exist or move on to the next operator if it does exist.
My operator constructor reads:
check_and_create_table = BigQueryCreateEmptyTableOperator(
task_id='check_and_create_final_table',
project_id = GCP_PROJECT,
dataset_id = BQ_DATASET,
table_id = BQ_TABLE,
schema_fields=get_final_schema(),
if_exists='skip'
)
and according to the airflow docs, the if_exists
parameter should be set to 'skip'
to achieve this. Unfortunately, I am receiving a DAG import error in Airflow which reads:
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 408, in apply_defaults
result = func(self, **kwargs, default_args=default_args)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 756, in __init__
raise AirflowException(
airflow.exceptions.AirflowException: Invalid arguments were passed to BigQueryCreateEmptyTableOperator (task_id: check_and_create_final_table). Invalid arguments were:
**kwargs: {'if_exists': 'skip'}
If anyone has any ideas, they would be much appreciated.
if_exists implemented in airflow version 2.6 and exist_ok depredated according that.
for version before 2.6 you should use exist_ok as an alternative
this is the PR