I have a gRPC service deployed on Google Cloud Run which I want to call from Composer.
I have assigned the roles/iam.serviceAccountTokenCreator
role to the service account which my composer worker nodes are running under, and I'm not mounting any custom service key files or setting the GOOGLE_APPLICATION_CREDENTIALS
environment variable.
Using the JWT_GOOGLE
authentication option in the airflow gRPC hook I get the following error:
[2022-05-31 14:20:16,082] {grpc.py:90} INFO - Calling gRPC service
[2022-05-31 14:20:16,097] {taskinstance.py:1152} ERROR - 'Credentials' object has no attribute 'signer_email'
Traceback (most recent call last):
File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 985, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/airflow/airflow/providers/grpc/operators/grpc.py", line 95, in execute
for response in responses:
File "/usr/local/lib/airflow/airflow/providers/grpc/hooks/grpc.py", line 136, in run
with self.get_conn() as channel:
File "/usr/local/lib/airflow/airflow/providers/grpc/hooks/grpc.py", line 104, in get_conn
jwt_creds = google_auth_jwt.OnDemandCredentials.from_signing_credentials(credentials)
File "/opt/python3.6/lib/python3.6/site-packages/google/auth/jwt.py", line 695, in from_signing_credentials
kwargs.setdefault("issuer", credentials.signer_email)
AttributeError: 'Credentials' object has no attribute 'signer_email'
[2022-05-31 14:20:16,100] {taskinstance.py:1196} INFO - Marking task as FAILED. dag_id=example_dag, task_id=example_task, execution_date=20220531T135709, start_date=20220531T142015, end_date=20220531T142016
[2022-05-31 14:20:23,826] {local_task_job.py:102} INFO - Task exited with return code 1
Does anyone have any idea how/why my credentials aren't including the field I need?
Found a solution to this after discussing with Google Cloud - essentially, it looks like the JWT_GOOGLE
authentication method isn't set up for GCE service accounts so I went down the CUSTOM
authentication route instead:
import google.auth.transport.grpc
import google.auth.transport.requests
import google.oauth2.credentials
import google.oauth2.id_token
from airflow.providers.grpc.operators.grpc import GrpcOperator
def connection_func(conn):
"""Custom connection function for gRPC authentication.
Args:
conn: Airflow Connection object
Returns:
An instantiated gRPC channel for making calls to our remote service.
"""
request = google.auth.transport.requests.Request()
if not str(conn.host).startswith("https://"):
audience = f"https://{conn.host}"
else:
audience = conn.host
token = google.oauth2.id_token.fetch_id_token(request, audience)
creds = google.oauth2.credentials.Credentials(token)
base_url = conn.host
if conn.port:
base_url = f"{base_url}:{conn.port}"
channel = google.auth.transport.grpc.secure_authorized_channel(
creds, None, base_url
)
return channel
return GrpcOperator(
...
custom_connection_func=connection_func,
)
This uses the approach seen here to fetch an ID token for a given audience, then create a set of credentials from there and finally instantiate the gRPC secure channel for use in the operator.