Search code examples
google-bigqueryairflow

Airflow BigQueryInsertJobOperator does not render jinja2 template file when called manually


I am trying to call BigQueryInsertJobOperator manually inside a PythonOperator like below, but SQL template is not generated:

def some_func(**kwargs):
# ...
append_tmp_to_raw_and_clean_after = BigQueryInsertJobOperator(
    task_id="append_tmp_to_raw",
    gcp_conn_id=GCP_CONN_ID,
    configuration={
        "query": {
            "query": '{% include "sql/merge_stage_raw_vehicle_position.sql" %}',
            "useLegacySql": False,
        }
    },
    params=params,
)
append_tmp_to_raw_and_clean_after.execute(context=kwargs)
# ....

If I use it as part of a DAG, it works correctly, but when called operator manually, it creates a query that is literally "{% include "sql/merge_stage_raw_vehicle_position.sql" %}" (according to BigQuery logs). Which means template rendering was not done.

I thought maybe I needed to call some method to render SQL explicitly, but after looking at the source code of BigQueryInsertJobOperator, I only found "prepare_template," and it does not use Jinja2, etc.

At this point, I think Airflow does some magic as part of DAG to render Jinja templates and replace SQL queries, but because I do it manually inside PythonOperator, it does not work.

I understand that I can manually render the Jinja2 template and pass it as a string, but I was curious if, by any chance, I forgot to call some unknown method to render SQL as it is. It would be ideal.

Thank you very much.

Airflow version: 2.8.0


Solution

  • I think Airflow does some magic as part of DAG to render Jinja templates and replace SQL queries

    It's the sequence of launching a task on a worker for execution. One of the steps is rendering the Jinja strings. The pattern of operator inside operator is bad practice (The main reason for it is what you are experiancing)

    The sequence of launching a task has many steps, when you are invoking operator inside operator you are responsible to handle all these steps yourself because Airflow has no idea that the inner class is operator. The scheduler does not look into the inside of your Python callable. I explained in some more detail about it in https://stackoverflow.com/a/65168908/14624409

    You didn't explain what you are after but probably one of the following would be an alternative to avoid the bad pattern:

    1. Use the relevant hook inside the PythonOperator for the functionality need.
    2. Create custom operator to handle all the needed functionality end to end (thus avoiding the need to wrap BigQueryInsertJobOperator with PythonOperator)

    Moving forward the Airflow community is discussing how to address this (see mailing list thread) I already raised this use case in the thread and pointed out that users are expected the pattern of operator inside operator to simply work out of the box. it's too soon to say what will come out of it but for the moment one of the alternatives I suggested should work just fine.