I have been having issues trying to template an Airflow Variable into a PostgresOperator
sql script. My sql script looks like:
UNLOAD('SELECT *, trunc(updated_at) as dt FROM prodreadcopy.cmd_{{ params.table_name }}')
TO 's3://{{ params.datalake_bucket }}/bronze/learning/{{ params.table_name }}/'
IAM_ROLE 'arn:aws:iam::1234567890:role/RedshiftETLRole'
PARTITION BY (dt)
PARQUET
ALLOWOVERWRITE;
The issue at hand is the datalake_bucket
. When I use the normal PostgresOperator
:
from airflow import DAG
from airflow.models import Variable
from airflow.models.baseoperator import chain
from airflow.operators.dummy import DummyOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.utils.task_group import TaskGroup
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 5, 11),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=1),
'on_failure_callback': notification_fail,
'on_retry_callback': notification_retry,
}
with DAG(
'learning_bronze_dag',
default_args=default_args,
description='Load all learning data to bronze',
catchup=False,
max_active_runs=1 ,
schedule_interval='@daily',
) as dag:
starter = DummyOperator(task_id="starter")
tables = [
'course_templates',
'courses',
'course_items',
'course_events',
'organization_permissions',
]
with TaskGroup('unload_tasks') as redshift_unload:
for table in tables:
op = CustomPostgresOperator(
task_id=table,
dag=dag,
postgres_conn_id=default_connection,
autocommit=True,
params={
'table_name': table,
'datalake_bucket': '{{var.value.datalake_bucket}}',
},
sql='sql/load_learning_to_s3.sql'
)
chain(starter, redshift_unload)
I get sql errors on that task:
psycopg2.errors.InternalError_: S3ServiceException:The specified bucket is not valid.
Failed to initialize S3 output stream. S3 path:
s3://{{var.value.datalake_bucket}}/bronze/learning/organization_permissions/dt=2023-05-02/0002_part_00.parquet
So I wrote a small operator to turn the params field into a templated one:
from airflow.models.dag import DAG
from airflow.models.baseoperator import BaseOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.utils.decorators import apply_defaults
from typing import Optional
class CustomPostgresOperator(BaseOperator):
"""Allows templating of parameter fields in
a postgres operator
"""
template_fields = ('params', 'sql')
template_fields_renderers = {
'sql': 'sql'
}
template_ext = ('.sql',)
ui_color = '#ededed'
@apply_defaults
def __init__(
self,
*,
sql: str,
autocommit: bool = False,
postgres_conn_id: str='redshift_default',
params: Optional[dict]=None,
database: Optional[str]=None,
**kwargs
):
super().__init__(**kwargs)
self.postgres_conn_id = postgres_conn_id
self.params = params or {}
self.hook = None
self.sql = sql
self.autocommit = autocommit
self.database = database
def execute(self, **kwargs):
self.log.info('Executing: %s', self.sql)
# adding this for visibility
self.log.info(f'Templating {self.params}')
self.hook = PostgresHook(postgres_conn_id=self.postgres_conn_id, schema=self.database)
self.hook.run(self.sql, self.autocommit, parameters=self.params)
for output in self.hook.conn.notices:
self.log.info(output)
I can see that the log output shows my bucket variable being templated:
[2023-06-30 20:31:10,486] {{postgres.py:40}} INFO - Templating {'table_name': 'organization_permissions', 'datalake_bucket': 'my-bucket'}
But in the same log output it's still showing:
psycopg2.errors.InternalError_: S3ServiceException:The specified bucket is not valid
Failed to initialize S3 output stream. S3 path: s3://{{var.value.datalake_bucket}}...
Why can't I send the datalake_bucket variable?
I am using Amazon Managed Apache Airflow version 2.0.2 I am not in a spot to upgrade at this point, just trying to understand why the parameters aren't working properly.
The log.info
output for both log statements look like the following:
[2023-06-30 20:31:10,466] {{postgres.py:39}} INFO - Executing: UNLOAD('SELECT *, trunc(updated_at) as dt FROM prodreadcopy.cmd_organization_permissions')
TO 's3://{{var.value.datalake_bucket}}/bronze/learning/organization_permissions/'
IAM_ROLE 'arn:aws:iam::1234567890:role/RedshiftETLRole'
PARTITION BY (dt)
PARQUET
ALLOWOVERWRITE;
[2023-06-30 20:31:10,486] {{postgres.py:40}} INFO - Templating {'table_name': 'organization_permissions', 'datalake_bucket': 'mybucket'}
After a bunch of trial and error, it turns out that this is an X-Y problem. I can directly reference templated variables in the sql script itself, so I should be using a mixture of params for fixed, string values, and jinja template variables in the sql script. My original script was:
UNLOAD('SELECT *, trunc(updated_at) as dt FROM prodreadcopy.cmd_{{ params.table_name }}')
TO 's3://{{ params.datalake_bucket }}/bronze/learning/{{ params.table_name }}/'
IAM_ROLE 'arn:aws:iam::1234567890:role/RedshiftETLRole'
PARTITION BY (dt)
PARQUET
ALLOWOVERWRITE;
But this really should be:
UNLOAD('SELECT *, trunc(updated_at) as dt FROM prodreadcopy.cmd_{{ params.table_name }}')
-- note the var.value.key format
TO 's3://{{ var.value.datalake_bucket }}/bronze/learning/{{ params.table_name }}/'
IAM_ROLE 'arn:aws:iam::1234567890:role/RedshiftETLRole'
PARTITION BY (dt)
PARQUET
ALLOWOVERWRITE;
So my PostgresOperator
looks like:
with TaskGroup('unload_tasks') as redshift_unload:
for table in tables:
op = PostgresOperator(
task_id=table,
dag=dag,
postgres_conn_id=default_connection,
autocommit=True,
params={
'table_name': table # only this value gets a param
},
sql='sql/load_learning_to_s3.sql'
)