Search code examples
pythonpostgresqlamazon-s3airflowmwaa

Issue with templating in Airflow PostgresOperator


I have been having issues trying to template an Airflow Variable into a PostgresOperator sql script. My sql script looks like:

UNLOAD('SELECT *, trunc(updated_at) as dt FROM prodreadcopy.cmd_{{ params.table_name }}')
TO 's3://{{ params.datalake_bucket }}/bronze/learning/{{ params.table_name }}/'
IAM_ROLE 'arn:aws:iam::1234567890:role/RedshiftETLRole'
PARTITION BY (dt)
PARQUET
ALLOWOVERWRITE;

The issue at hand is the datalake_bucket. When I use the normal PostgresOperator:

from airflow import DAG
from airflow.models import Variable
from airflow.models.baseoperator import chain
from airflow.operators.dummy import DummyOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.utils.task_group import TaskGroup

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 5, 11),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=1),
    'on_failure_callback': notification_fail,
    'on_retry_callback': notification_retry,
}

with DAG(
    'learning_bronze_dag',
    default_args=default_args,
    description='Load all learning data to bronze',
    catchup=False,
    max_active_runs=1 ,
    schedule_interval='@daily', 
) as dag:

    starter = DummyOperator(task_id="starter")

    tables = [
        'course_templates',
        'courses',
        'course_items',
        'course_events',
        'organization_permissions',
    ]

    with TaskGroup('unload_tasks') as redshift_unload:
        for table in tables:
            op = CustomPostgresOperator(
                task_id=table,
                dag=dag,
                postgres_conn_id=default_connection,
                autocommit=True,
                params={
                    'table_name': table,
                    'datalake_bucket': '{{var.value.datalake_bucket}}',
                },
                sql='sql/load_learning_to_s3.sql'
            )

    chain(starter, redshift_unload)

I get sql errors on that task:

psycopg2.errors.InternalError_: S3ServiceException:The specified bucket is not valid.
Failed to initialize S3 output stream. S3 path: 
s3://{{var.value.datalake_bucket}}/bronze/learning/organization_permissions/dt=2023-05-02/0002_part_00.parquet

So I wrote a small operator to turn the params field into a templated one:

from airflow.models.dag import DAG
from airflow.models.baseoperator import BaseOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.utils.decorators import apply_defaults
from typing import Optional


class CustomPostgresOperator(BaseOperator):
    """Allows templating of parameter fields in
    a postgres operator
    """
    template_fields = ('params', 'sql')
    template_fields_renderers = {
        'sql': 'sql'
    }
    template_ext = ('.sql',)
    ui_color = '#ededed'

    @apply_defaults
    def __init__(
        self,
        *,
        sql: str,
        autocommit: bool = False,
        postgres_conn_id: str='redshift_default',
        params: Optional[dict]=None,
        database: Optional[str]=None,
        **kwargs
    ):
        super().__init__(**kwargs)
        self.postgres_conn_id = postgres_conn_id
        self.params = params or {}
        self.hook = None
        self.sql = sql
        self.autocommit = autocommit
        self.database = database


    def execute(self, **kwargs):
        self.log.info('Executing: %s', self.sql)

        # adding this for visibility
        self.log.info(f'Templating {self.params}')

        self.hook = PostgresHook(postgres_conn_id=self.postgres_conn_id, schema=self.database)
        self.hook.run(self.sql, self.autocommit, parameters=self.params)
        for output in self.hook.conn.notices:
            self.log.info(output)

I can see that the log output shows my bucket variable being templated:

[2023-06-30 20:31:10,486] {{postgres.py:40}} INFO - Templating {'table_name': 'organization_permissions', 'datalake_bucket': 'my-bucket'}

But in the same log output it's still showing:

psycopg2.errors.InternalError_: S3ServiceException:The specified bucket is not valid
Failed to initialize S3 output stream. S3 path: s3://{{var.value.datalake_bucket}}...

Why can't I send the datalake_bucket variable?

I am using Amazon Managed Apache Airflow version 2.0.2 I am not in a spot to upgrade at this point, just trying to understand why the parameters aren't working properly.

Edit

The log.info output for both log statements look like the following:

[2023-06-30 20:31:10,466] {{postgres.py:39}} INFO - Executing: UNLOAD('SELECT *, trunc(updated_at) as dt FROM prodreadcopy.cmd_organization_permissions')
TO 's3://{{var.value.datalake_bucket}}/bronze/learning/organization_permissions/'
IAM_ROLE 'arn:aws:iam::1234567890:role/RedshiftETLRole'
PARTITION BY (dt)
PARQUET
ALLOWOVERWRITE;
[2023-06-30 20:31:10,486] {{postgres.py:40}} INFO - Templating {'table_name': 'organization_permissions', 'datalake_bucket': 'mybucket'}

Solution

  • After a bunch of trial and error, it turns out that this is an X-Y problem. I can directly reference templated variables in the sql script itself, so I should be using a mixture of params for fixed, string values, and jinja template variables in the sql script. My original script was:

    UNLOAD('SELECT *, trunc(updated_at) as dt FROM prodreadcopy.cmd_{{ params.table_name }}')
    TO 's3://{{ params.datalake_bucket }}/bronze/learning/{{ params.table_name }}/'
    IAM_ROLE 'arn:aws:iam::1234567890:role/RedshiftETLRole'
    PARTITION BY (dt)
    PARQUET
    ALLOWOVERWRITE;
    

    But this really should be:

    UNLOAD('SELECT *, trunc(updated_at) as dt FROM prodreadcopy.cmd_{{ params.table_name }}')
    -- note the var.value.key format
    TO 's3://{{ var.value.datalake_bucket }}/bronze/learning/{{ params.table_name }}/'
    IAM_ROLE 'arn:aws:iam::1234567890:role/RedshiftETLRole'
    PARTITION BY (dt)
    PARQUET
    ALLOWOVERWRITE;
    

    So my PostgresOperator looks like:

        with TaskGroup('unload_tasks') as redshift_unload:
            for table in tables:
                op = PostgresOperator(
                    task_id=table,
                    dag=dag,
                    postgres_conn_id=default_connection,
                    autocommit=True,
                    params={
                        'table_name': table # only this value gets a param
                    },
                    sql='sql/load_learning_to_s3.sql'
                )