Search code examples
airflowmwaa

MWAA not finding aws_default connection


I just set up AWS MWAA (managed airflow) and I'm playing around with running a simple bash script in a dag. I was reading the logs for the task and noticed that by default, the task looks for the aws_default connection and tries to use it but doesn't find it.

I went to the connections pane and set the aws_default connection but it still is showing the same message in the logs.

Airflow Connection: aws_conn_id=aws_default

No credentials retrieved from Connection

*** Reading remote log from Cloudwatch log_group: airflow-mwaa-Task log_stream: dms-

postgres-dialog-label-pg/start-replication-task/2021-11-22T13_00_00+00_00/1.log.
[2021-11-23 13:01:02,487] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,486] {{base_aws.py:368}} INFO - Airflow Connection: aws_conn_id=aws_default
[2021-11-23 13:01:02,657] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,656] {{base_aws.py:179}} INFO - No credentials retrieved from Connection
[2021-11-23 13:01:02,678] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,678] {{base_aws.py:87}} INFO - Creating session with aws_access_key_id=None region_name=us-east-1
[2021-11-23 13:01:02,772] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,772] {{base_aws.py:157}} INFO - role_arn is None

How can I get MWAA to recognize this connection?

My dag:

from datetime import datetime, timedelta, tzinfo
import pendulum

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

local_tz = pendulum.timezone("America/New_York")
start_date = datetime(2021, 11, 9, 8, tzinfo=local_tz)
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}
with DAG(
    'dms-postgres-dialog-label-pg-test',
    default_args=default_args,
    description='',
    schedule_interval=timedelta(days=1),
    start_date=start_date,
    tags=['example'],
) as dag:

    t1 = BashOperator(
        task_id='start-replication-task',
        bash_command="""
        aws dms start-replication-task --replication-task-arn arn:aws:dms:us-east-1:blah --start-replication-task-type reload-target
        """,
    )

    t1

Edit: For now, I'm just importing an in-built function and using that to get the credentials. Example:

from airflow.hooks.base import BaseHook
conn = BaseHook.get_connection('aws_service_account')
...

print(conn.host)
print(conn.login)
print(conn.password)

Solution

  • Updating this as I just got off with AWS support.

    The execution role MWAA creates is used instead of an access key id and secret in aws_default. To use a custom access key id and secret do as @Jonathan Porter recommends with his question's answer:

    from airflow.hooks.base import BaseHook
    conn = BaseHook.get_connection('aws_service_account')
    ...
    
    print(conn.host)
    print(conn.login)
    print(conn.password)
    

    However, if one wants to use the execution role specifically that mwaa provides, this is the default within mwaa. Confusingly, the info messages state that no credentials were retrieved from the connection, however the execution role will be used in something akin to the kubernetes pod operator.

    [2021-11-23 13:01:02,487] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,486] {{base_aws.py:368}} INFO - Airflow Connection: aws_conn_id=aws_default
    [2021-11-23 13:01:02,657] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,656] {{base_aws.py:179}} INFO - No credentials retrieved from Connection
    [2021-11-23 13:01:02,678] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,678] {{base_aws.py:87}} INFO - Creating session with aws_access_key_id=None region_name=us-east-1
    [2021-11-23 13:01:02,772] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,772] {{base_aws.py:157}} INFO - role_arn is None
    

    For example, the following uses the .aws/credentials set by the execution role in the mwaa env automatically with this:

    from datetime import timedelta
    from airflow import DAG
    from datetime import datetime
    from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
    
    default_args = {
        'owner': 'aws',
        'depends_on_past': False,
        'start_date': datetime(2019, 2, 20),
        'provide_context': True
    }
    
    dag = DAG(
        'kubernetes_pod_example', default_args=default_args, schedule_interval=None
    )
    
    #use a kube_config stored in s3 dags folder for now
    kube_config_path = '/usr/local/airflow/dags/kube_config.yaml'
    
    podRun = KubernetesPodOperator(
        namespace="mwaa",
        image="ubuntu:18.04",
        cmds=["bash"],
        arguments=["-c", "ls"],
        labels={"foo": "bar"},
        name="mwaa-pod-test",
        task_id="pod-task",
        get_logs=True,
        dag=dag,
        is_delete_operator_pod=False,
        config_file=kube_config_path,
        in_cluster=False,
        cluster_context='aws',
        execution_timeout=timedelta(seconds=60)
    )
    

    Hope this helps for anyone else stumbling around.