Search code examples
pythongoogle-cloud-platformoauthgoogle-oauthgoogle-cloud-composer

Unable to trigger composer/airflow dag from Cloud function that triggers when there are changes in cloud storage


I have created and ran dags on a google-cloud-composer environment (dlkpipelinesv1 : composer-1.13.0-airflow-1.10.12). I am able to trigger these dags manually, and using the scheduler, but I am stuck when it comes to triggering them via cloud-functions that detect changes in a google-cloud-storage bucket.

Note that I had another GC-Composer environment (pipelines:composer-1.7.5-airflow-1.10.2) that used those same google cloud functions to trigger the relevant dags, and it was working.

I followed this guide to create the functions that trigger the dags. So I retrieved the following variables:

PROJECT_ID = <project_id>
CLIENT_ID = <client_id_retrieved_by_running_the_code_in_the_guide_within_my_gcp_console>
WEBSERVER_ID = <airflow_webserver_id>
DAG_NAME = <dag_to_trigger>
WEBSERVER_URL = f"https://{WEBSERVER_ID}.appspot.com/api/experimental/dags/{DAG_NAME}/dag_runs"


def file_listener(event, context):
    """Entry point of the cloud function: Triggered by a change to a Cloud Storage bucket.
    Args:
         event (dict): Event payload.
         context (google.cloud.functions.Context): Metadata for the event.
    """
    logging.info("Running the file listener process")
    logging.info(f"event : {event}")
    logging.info(f"context : {context}")
    file = event
    if file["size"] == "0" or "DTM_DATALAKE_AUDIT_COMPTAGE" not in file["name"] or ".filepart" in file["name"].lower():
        logging.info("no matching file")
        exit(0)

    logging.info(f"File listener detected the presence of : {file['name']}.")

    # id_token = authorize_iap()
    # make_iap_request({"file_name": file["name"]}, id_token)
    make_iap_request(url=WEBSERVER_URL, client_id=CLIENT_ID, method="POST")


def make_iap_request(url, client_id, method="GET", **kwargs):
    """Makes a request to an application protected by Identity-Aware Proxy.

    Args:
      url: The Identity-Aware Proxy-protected URL to fetch.
      client_id: The client ID used by Identity-Aware Proxy.
      method: The request method to use
              ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                If no timeout is provided, it is set to 90 by default.

    Returns:
      The page body, or raises an exception if the page couldn't be retrieved.
    """
    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    # Obtain an OpenID Connect (OIDC) token from metadata server or using service account.
    open_id_connect_token = id_token.fetch_id_token(Request(), client_id)
    logging.info(f"Retrieved open id connect (bearer) token {open_id_connect_token}")

    # Fetch the Identity-Aware Proxy-protected URL, including an authorization header containing "Bearer " followed by a
    # Google-issued OpenID Connect token for the service account.
    resp = requests.request(method, url, headers={"Authorization": f"Bearer {open_id_connect_token}"}, **kwargs)

    if resp.status_code == 403:
        raise Exception("Service account does not have permission to access the IAP-protected application.")
    elif resp.status_code != 200:
        raise Exception(f"Bad response from application: {resp.status_code} / {resp.headers} / {resp.text}")
    else:
        logging.info(f"Response status - {resp.status_code}")
        return resp.json

This is the code that runs in the GC-functions I have checked the environment details in dlkpipelinesv1 and piplines respectively, using this code :

credentials, _ = google.auth.default(
    scopes=['https://www.googleapis.com/auth/cloud-platform'])
authed_session = google.auth.transport.requests.AuthorizedSession(
    credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    'https://composer.googleapis.com/v1beta1/projects/{}/locations/{}'
    '/environments/{}').format(project_id, location, composer_environment)
composer_response = authed_session.request('GET', environment_url)
environment_data = composer_response.json()

and the two are using the same service accounts to run, i.e. the same IAM roles. Although I have noticed the following different details :

In the old environment :

"airflowUri": "https://p5<hidden_value>-tp.appspot.com",
    "privateEnvironmentConfig": { "privateClusterConfig": {} },

in the new environment:

"airflowUri": "https://da<hidden_value>-tp.appspot.com",
    "privateEnvironmentConfig": {
      "privateClusterConfig": {},
      "webServerIpv4CidrBlock": "<hidden_value>",
      "cloudSqlIpv4CidrBlock": "<hidden_value>"
    }

The service account that I use to make the post request has the following roles :

Cloud Functions Service Agent 
Composer Administrator 
Composer User
Service Account Token Creator 
Service Account User

The service account that runs my composer environment has the following roles :

BigQuery Admin
Composer Worker
Service Account Token Creator
Storage Object Admin

But I am still receiving a 403 - Forbidden in the Log Explorer when the post request is made to the airflow API.

EDIT 2020-11-16 :

I've updated to the latest make_iap_request code. I tinkered with the IAP within the security service, but I cannot find the webserver that will accept HTTP: post requests from my cloud functions... See the image bellow, anyway I added the service account to the default and CRM IAP resources just in case, but I still get this error :

Exception: Service account does not have permission to access the IAP-protected application.

The main question is: What IAP is at stake here?? And how do I add my service account as a user of this IAP.

What am I missing?

List of HTTP IAP


Solution

  • There is a configuration parameter that causes ALL requests to the API to be denied...

    In the documentation, it is mentioned that we need to override the following airflow configuration :

    [api]
    auth_backend = airflow.api.auth.backend.deny_all
    

    into

    [api]
    auth_backend = airflow.api.auth.backend.default
    

    This detail is really important to know, and it is not mentioned in google's documentation...

    Useful links :

    1. Triggering DAGS (workflows) with GCS
    2. make_iap_request.py repository