I have a DAG that requests SQL script via REST API and executes it.
SimpleHttpOperator(
task_id='get_sql_query',
endpoint='path/to/sql_file.sql',
method='GET',
http_conn_id='some_http_conn_id',
log_response=True
) >> SQLCheckOperator(
task_id='check_something',
sql='{{ ti.xcom_pull(task_ids="get_sql_query") }}',
params={"table_name": "test_table"}
conn_id ='postgres_default',
)
Here is the code of sql_file.sql
:
SELECT
COUNT(1) == COUNT(DISTINCT id)
FROM
{{ params.table_name }}
Right now it doesn't parse table_name
and tries to execute literally {{ params.table_name }}
.
But, I want to set this table_name
inside my DAG. Is that even possible?
EDIT: I realized on my end both with and without quotes around the Jinja template in the SQL file works. And yes, the SQL code fetched from an upstream operator does not get templated but there is a workaround:
from airflow.decorators import dag, task
from airflow.providers.common.sql.operators.sql import SQLCheckOperator
from pendulum import datetime
@dag(
start_date=datetime(2023,2,2),
schedule=None,
catchup=False,
)
def testing():
@task # replace this with your SimpleHTTP operator
def fetch_sql_query():
return """SELECT
COUNT(DISTINCT id) > 1
FROM
{table_name}"""
@task # have a task in the middle that fills in the templates
def create_sql(query, table_name):
return query.format(table_name=table_name)
t2 = SQLCheckOperator(
task_id='check_something',
sql="{{ ti.xcom_pull(task_ids='create_sql') }}",
conn_id ='postgres_default',
)
create_sql(fetch_sql_query(), "ab_permission") >> t2
testing()