Search code examples
google-cloud-platformairflowgoogle-cloud-composergoogle-iam

ERROR - 'Credentials' object has no attribute 'signer_email'


I have a gRPC service deployed on Google Cloud Run which I want to call from Composer.

I have assigned the roles/iam.serviceAccountTokenCreator role to the service account which my composer worker nodes are running under, and I'm not mounting any custom service key files or setting the GOOGLE_APPLICATION_CREDENTIALS environment variable.

Using the JWT_GOOGLE authentication option in the airflow gRPC hook I get the following error:

[2022-05-31 14:20:16,082] {grpc.py:90} INFO - Calling gRPC service
[2022-05-31 14:20:16,097] {taskinstance.py:1152} ERROR - 'Credentials' object has no attribute 'signer_email'
Traceback (most recent call last):
  File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 985, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/airflow/airflow/providers/grpc/operators/grpc.py", line 95, in execute
    for response in responses:
  File "/usr/local/lib/airflow/airflow/providers/grpc/hooks/grpc.py", line 136, in run
    with self.get_conn() as channel:
  File "/usr/local/lib/airflow/airflow/providers/grpc/hooks/grpc.py", line 104, in get_conn
    jwt_creds = google_auth_jwt.OnDemandCredentials.from_signing_credentials(credentials)
  File "/opt/python3.6/lib/python3.6/site-packages/google/auth/jwt.py", line 695, in from_signing_credentials
    kwargs.setdefault("issuer", credentials.signer_email)
AttributeError: 'Credentials' object has no attribute 'signer_email'
[2022-05-31 14:20:16,100] {taskinstance.py:1196} INFO - Marking task as FAILED. dag_id=example_dag, task_id=example_task, execution_date=20220531T135709, start_date=20220531T142015, end_date=20220531T142016
[2022-05-31 14:20:23,826] {local_task_job.py:102} INFO - Task exited with return code 1

Does anyone have any idea how/why my credentials aren't including the field I need?


Solution

  • Found a solution to this after discussing with Google Cloud - essentially, it looks like the JWT_GOOGLE authentication method isn't set up for GCE service accounts so I went down the CUSTOM authentication route instead:

    import google.auth.transport.grpc
    import google.auth.transport.requests
    import google.oauth2.credentials
    import google.oauth2.id_token
    from airflow.providers.grpc.operators.grpc import GrpcOperator
    
    def connection_func(conn):
        """Custom connection function for gRPC authentication.
    
        Args:
            conn: Airflow Connection object
    
        Returns:
            An instantiated gRPC channel for making calls to our remote service.
        """
    
        request = google.auth.transport.requests.Request()
    
        if not str(conn.host).startswith("https://"):
            audience = f"https://{conn.host}"
        else:
            audience = conn.host
    
        token = google.oauth2.id_token.fetch_id_token(request, audience)
        creds = google.oauth2.credentials.Credentials(token)
        base_url = conn.host
        if conn.port:
            base_url = f"{base_url}:{conn.port}"
        channel = google.auth.transport.grpc.secure_authorized_channel(
            creds, None, base_url
        )
        return channel
    
    
    return GrpcOperator(
            ...
            custom_connection_func=connection_func,
        )
    
    

    This uses the approach seen here to fetch an ID token for a given audience, then create a set of credentials from there and finally instantiate the gRPC secure channel for use in the operator.