I need to count the number of rows in a table and use the row count in the filename of an export to GCS. The following is an excerpt from my DAG.
with models.DAG(
'my_dag',
schedule_interval = '0 6 * * 1',
start_date = datetime(2022, 1, 1),
catchup = False
) as dag:
# create segment filterd views to output CSV to GCS
def prepareSegmentTables(segment, **kwargs):
segment_table_queries = f"""
TRUNCATE TABLE dataset.some_table;
INSERT INTO dataset.some_table (column1)
SELECT DISTINCT column1
FROM dataset.some_other_table
WHERE column2 = '{ segment['id'] }';
"""
# execute query
client.query(segment_table_queries).result()
# store the row counts of each type
kwargs['ti'].xcom_push(
key = "ROW_COUNTS",
value = {
"column1": getTableRowCount("dataset.some_table"),
}
)
def get_row_counts(segment, **kwargs):
ROW_COUNTS = kwargs['ti'].xcom_pull(
key = "ROW_COUNTS",
task_ids = [ f"prepare_segment_tables" ]
)
#tasks
prepare_segment_tables = PythonOperator(
task_id = f"prepare_segment_tables",
python_callable = prepareSegmentTables,
op_kwargs = { "segment": segment },
dag = dag
)
export_to_gcs = BigQueryToCloudStorageOperator(
task_id = f"gcs_lr_to_li_auid_{segment['id']}",
source_project_dataset_table = f"{GCP_PROJECT}.{DATASET_NAME}.some_table",
destination_cloud_storage_uris = f"gs://{GCS_BUCKET}/{FILENAME_PATH}{segment['name']}_"
+ str( ti.xcom_pull(key = "ROW_COUNTS", task_ids = [ f"prepare_segment_tables" ])[0].column1 )
+ f"_{TODAY_STR}.csv",
# this works though
# destination_cloud_storage_uris = f"gs://{GCS_BUCKET}/{FILENAME_PATH}{segment['name']}_" + str( getTableRowCount("dataset.some_table") ) + f"_{TODAY_STR}.csv",
compression = 'NONE', export_format = 'CSV', field_delimiter = ',', print_header = True
)
prepare_segment_tables >> export_to_gcs
As can be seen, I am pushing ROW_COUNTS
into xcom while calling prepareSegmentTables
via a PythonOperator. When I do xcom_pull inside another PythonOperator, calling get_row_counts
, it properly pulls the value, but when I pass the same syntax as a parameter to BigQueryToCloudStorageOperator
or BigQueryToGCSOperator
, it throws an error.
It says ti
or kwargs['ti']
, depending on what I use is undefined. Some people suggest using double {{ }}
, and even that didn't work for me.
For now, I have resorted to calling getTableRowCount()
directly, in the parameter instead of first storing it in a variable. It works, but I use the filename downstream at least one more time, and this approach results in unnecessarily querying the table for a row count multiple times.
Any help getting xcom to work or to figure out a way to get row count in the filename efficiently is appreciated.
Airflow is a distributed system. It is important to note that your DAG code isn't executed all in the same context.
The DAG is parsed and assembled on a schedule. The Task is executed on a worker. XCOM is available inside the worker context.
As you saw - ti
or kwargs['ti']
would allow you to access XCOMs from inside a PythonOperator (specifically the python_callable
) but in your BigQueryToCloudStorageOperator
, you don't have task_instance
available, as you aren't in that context.
You can use Jinja Templating to defer fetching the XCOM until you are in the correct context with the worker (read more here https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html )
You probably need something like:
BigQueryToCloudStorageOperator(
...
destination_cloud_storage_uris = f"gs://{GCS_BUCKET}/{FILENAME_PATH}{segment['name']}_" + "{{ ti.xcom_pull(key = "ROW_COUNTS", task_ids=["prepare_segment_tables"])[0].column1 ) }}" + f"_{TODAY_STR}.csv",
...
)
Note that I am being careful to not mix f-strings and jinja there, as they both utilize the {}
syntax and don't play well together.
You will also probably need to ensure that your XCOM will parse as you expect in Jinja, and may want to return your XCOM differently or parse it in advanced with a PythonOperator
See here for help with what you can do within a Jinja template: https://jinja.palletsprojects.com/
Note also that this works only because "destination_cloud_storage_uris" is a templated field on this operator - and not all fields are. https://airflow.apache.org/docs/apache-airflow/1.10.15/_modules/airflow/contrib/operators/bigquery_to_gcs.html#BigQueryToCloudStorageOperator