Search code examples
flaskairflowamazon-cognitokubernetes-helmairflow-webserver

Authorization for Airflow 2.0 and AWS Cognito using roles


i am deploying Airflow 2.3.0 with the official helm chart and using AWS Cognito for authentication. This works so far with the following webserver_config.py:

import sys
from tokenize import group
from airflow import configuration as conf
from airflow.www.security import AirflowSecurityManager
from flask_appbuilder.security.manager import AUTH_OAUTH
import logging
import os
import json

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
logger = logging.getLogger()

class CognitoSecurity(AirflowSecurityManager):

    def oauth_user_info(self, provider, response=None):
        if provider == "aws_cognito" and response:
            
            logger.info(response)


            res = self.appbuilder.sm.oauth_remotes[provider].get('oauth2/userInfo')
            if res.raw.status != 200:
               logger.error('Failed to obtain user info: %s', res.data)
               return
            me = json.loads(res._content)
            logger.info(" user_data: %s", me)
            return {"username": me.get("username"), "email": me.get("email")}
        else:
            return {}


AUTH_TYPE = AUTH_OAUTH
AUTH_ROLES_SYNC_AT_LOGIN = True  # Checks roles on every login
AUTH_USER_REGISTRATION = True
AUTH_USER_REGISTRATION_ROLE = "Admin"
COGNITO_URL = os.environ['COGNITO_URL'] 
CONSUMER_KEY = os.environ['CONSUMER_KEY']
SECRET_KEY = os.environ['SECRET_KEY']
REDIRECT_URI = os.environ['REDIRECT_URI']
JWKS_URI = ("https://cognito-idp.%s.amazonaws.com/%s/.well-known/jwks.json"
            % (os.environ['AWS_REGION'], os.environ['COGNITO_POOL_ID'])) 
OAUTH_PROVIDERS = [{
    'name':'aws_cognito',
    #'whitelist': ['@test.com'],  # optional
    'token_key':'access_token',
    'url': COGNITO_URL,
    'icon': 'fa-amazon',
    'remote_app': {
        'client_id': CONSUMER_KEY,
        'client_secret': SECRET_KEY,
        'base_url': os.path.join(COGNITO_URL, 'oauth2/idpresponse'),
        "api_base_url": COGNITO_URL,
        'redirect_uri' : REDIRECT_URI,
        'jwks_uri': JWKS_URI,
        'client_kwargs': {
            'scope': 'email openid profile'
        },
        'access_token_url': os.path.join(COGNITO_URL, 'oauth2/token'),
        'authorize_url': os.path.join(COGNITO_URL, 'oauth2/authorize')
    }
}]
SECURITY_MANAGER_CLASS = CognitoSecurity

But now I'm trying to get the groups a user is in to map the groups to Airflow roles. And at this point I'm stuck and not getting anywhere.... Can someone help me with this?


Solution

  • It turned out that the method _azure_jwt_token_parse from the class BaseSecurityManager is misleading from the name. The parse function is not Azure specific and can therefore be used for other jwt token as well.

    With the following webserver_config.py I can map the AWS Cognito groups to Airflow roles:

    from signal import siginterrupt
    import sys
    from base64 import urlsafe_b64decode
    from airflow import configuration as conf
    from airflow.www.security import AirflowSecurityManager
    from flask import session
    from flask_appbuilder.security.manager import AUTH_OAUTH
    import jwt
    import logging
    import os
    import json
    
    logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
    logger = logging.getLogger()
    
    class CognitoSecurity(AirflowSecurityManager):
        def oauth_user_info(self, provider, response=None):
            if provider == "aws_cognito" and response:
                
                logger.debug(response)
    
    
                res = self.appbuilder.sm.oauth_remotes[provider].get('oauth2/userInfo')
                
                if res.raw.status != 200:
                   logger.error('Failed to obtain user info: %s', res.data)
                   return
                me = json.loads(res._content)
                # 
                decoded_token = self._azure_jwt_token_parse(response["id_token"])
                logger.debug(" data: %s", decoded_token)
                return {"username": me.get("username"), 
                        "email": me.get("email"),
                        "role_keys":  decoded_token.get("cognito:groups", ["Public"])
                        }
            else:
                return {}
    
    
    AUTH_TYPE = AUTH_OAUTH
    AUTH_ROLES_SYNC_AT_LOGIN = True  # Checks roles on every login
    AUTH_USER_REGISTRATION = True
    #AUTH_USER_REGISTRATION_ROLE = "Admin"
    AUTH_USER_REGISTRATION_ROLE = "Public"
    
    AUTH_ROLES_MAPPING = {
        "Viewer": ["User"],
        "Admin": ["Admin"],
    }
    COGNITO_URL = os.environ['COGNITO_URL'] 
    CONSUMER_KEY = os.environ['CONSUMER_KEY']
    SECRET_KEY = os.environ['SECRET_KEY']
    REDIRECT_URI = os.environ['REDIRECT_URI']
    JWKS_URI = ("https://cognito-idp.%s.amazonaws.com/%s/.well-known/jwks.json"
                % (os.environ['AWS_REGION'], os.environ['COGNITO_POOL_ID'])) 
    OAUTH_PROVIDERS = [{
        'name':'aws_cognito',
        #'whitelist': ['@web.com'],  # optional
        'token_key':'access_token',
        'url': COGNITO_URL,
        'icon': 'fa-amazon',
        'remote_app': {
            'client_id': CONSUMER_KEY,
            'client_secret': SECRET_KEY,
            'base_url': os.path.join(COGNITO_URL, 'oauth2/idpresponse'),
            "api_base_url": COGNITO_URL,
            'redirect_uri' : REDIRECT_URI,
            'jwks_uri': JWKS_URI,
            'client_kwargs': {
                'scope': 'email openid profile'
            },
            'access_token_url': os.path.join(COGNITO_URL, 'oauth2/token'),
            'authorize_url': os.path.join(COGNITO_URL, 'oauth2/authorize')
        }
    }]
    SECURITY_MANAGER_CLASS = CognitoSecurity