Search code examples
amazon-web-servicesairflowaws-secrets-managermwaa

MWAA can retrieve variable by ID but not connection from AWS Secrets Manager


we've set up AWS SecretsManager as a secrets backend to Airflow (AWS MWAA) as described in their documentation. Unfortunately, nowhere is explained where the secrets are to be found and how they are to be used then. When I supply conn_id to a task in a DAG, we can see two errors in the task logs, ValueError: Invalid IPv6 URL and airflow.exceptions.AirflowNotFoundException: The conn_id redshift_conn isn't defined. What's even more surprising is that when retrieving variables stored the same way with Variable.get('my_variable_id'), it works just fine.

The question is: Am I wrongly expecting that the conn_id can be directly passed to operators as SomeOperator(conn_id='conn-id-in-secretsmanager')? Must I retrieve the connection manually each time I want to use it? I don't want to run something like read_from_aws_sm_fn in the code below every time beforehand...

Btw, neither the connection nor the variable show up in the Airflow UI.

Having stored a secret named airflow/connections/redshift_conn (and on the side one airflow/variables/my_variable_id), I expect the connection to be found and used when constructing RedshiftSQLOperator(task_id='mytask', redshift_conn_id='redshift_conn', sql='SELECT 1'). But this results in the above error. I am able to retrieve the redshift connection manually in a DAG with a separate task, but I think that is not how SecretsManager is supposed to be used in this case.

The example DAG is below:

from airflow import DAG, settings, secrets
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.models.baseoperator import chain
from airflow.models import Connection, Variable
from airflow.providers.amazon.aws.operators.redshift import RedshiftSQLOperator

from datetime import timedelta

sm_secret_id_name = f'airflow/connections/redshift_conn'

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(1),
    'retries': 1,
}

def read_from_aws_sm_fn(**kwargs):  # from AWS example code
    ### set up Secrets Manager
    hook = AwsBaseHook(client_type='secretsmanager')
    client = hook.get_client_type('secretsmanager')
    response = client.get_secret_value(SecretId=sm_secret_id_name)
    myConnSecretString = response["SecretString"]

    print(myConnSecretString[:15])

    return myConnSecretString

def get_variable(**kwargs):
    my_var_value = Variable.get('my_test_variable')
    print('variable:')
    print(my_var_value)
    return my_var_value

with DAG(
        dag_id=f'redshift_test_dag',
        default_args=default_args,
        dagrun_timeout=timedelta(minutes=10),
        start_date=days_ago(1),
        schedule_interval=None,
        tags=['example']
) as dag:
    read_from_aws_sm_task = PythonOperator(
        task_id="read_from_aws_sm",
        python_callable=read_from_aws_sm_fn,
        provide_context=True
    )  # works fine

    query_redshift = RedshiftSQLOperator(
        task_id='query_redshift',
        redshift_conn_id='redshift_conn',
        sql='SELECT 1;'
    )  # results in above errors :-(

    try_to_get_variable_value = PythonOperator(
        task_id='get_variable',
        python_callable=get_variable,
        provide_context=True
    )  # works fine!

Solution

  • The question is: Am I wrongly expecting that the conn_id can be directly passed to operators as SomeOperator(conn_id='conn-id-in-secretsmanager')? Must I retrieve the connection manually each time I want to use it? I don't want to run something like read_from_aws_sm_fn in the code below every time beforehand...

    Using secret manager as a backend, you don't need to change the way you use the connections or variables. They work the same way, when looking up a connection/variable, airflow follow a search path.

    Btw, neither the connection nor the variable show up in the Airflow UI.

    The connection/variable will not up in the UI.

    ValueError: Invalid IPv6 URL and airflow.exceptions.AirflowNotFoundException: The conn_id redshift_conn isn't defined

    The 1st error is related to the secret and the 2nd error is due to the connection not existing in the airflow UI.

    There is 2 formats to store connections in secret manager (depending on the aws provider version installed) the IPv6 URL error could be that its not parsing the connection correctly. Here is a link to the provider docs.