Search code examples
pythonfirebasegoogle-cloud-platformfirebase-authenticationgoogle-identity-toolkit

GCP signed JWT kid does not exist (disappeared) from googleapis.com/robot/v1/metadata/x509/my-service@**-**.iam.gserviceaccount.com, JWT is invalid


We have a flow where we sign JWT tokens using python GCP IAMCredentialsClient the following way:

    def sign_jwt(self, payload, aud=None):
    """

    @param payload:
    @param aud:
    @return:
    """
    self.logger.info("Signing jwt")
    payload.update({
        "iss": self.service_account_email,
        "aud": aud if aud else Identity.JWT_AUDIENCE,
        "iat": int(time.time()),
    })
    self.logger.info("Payload", extra=payload)
    if not self.signer:
        self.signer = IAMCredentialsClient()
    signed_jwt_resp = self.signer.sign_jwt(name=self.signer_name, payload=json.dumps(payload))
    self.logger.info("Jwt signed successfully")
    return signed_jwt_resp.signed_jwt

We then later use the signed JWT for some internal exchanging mechanism that we have and we "count" on GCP validation capabilities for have this exchanging mechanism work. The validation process looks like this:

    def _get_certificates(self, service_account):
    """

    @param service_account:
    @return:
    """
    return requests.get(f"https://www.googleapis.com/robot/v1/metadata/x509/{service_account}").json()

    def _get_jwt_pkey(self, iss, kid):
    """

    @param iss:
    @param kid:
    @return:
    """
    if not iss.endswith(f"@{self.app.project_id}.iam.gserviceaccount.com"):
        raise Exception("Invalid issuer")
    if iss not in self.certificates:
        self.certificates[iss] = self._get_certificates(iss)

    certificate = self.certificates.get(iss).get(kid)
    # now translate to pkey
    # load the public key from the certificate
    private_key = load_pem_x509_certificate(certificate.encode()).public_key()
    # translate public key bytes to PEM
    public_key = private_key.public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo)
    return public_key

    def verify_jwt(self, jwt):
    """

    @param jwt:
    @return:
    """
    try:
        # extract metadata [NO VERIFICATION]
        header, payload, signed_section, signature = google_unverified_decode(jwt)
        # get appropriate public key
        public_key = self._get_jwt_pkey(payload.get("iss"), header.get("kid"))

        # Actual verify the token
        decoded_token = pyjwt.decode(jwt, key=public_key, algorithms=["RS256"], audience=Identity.JWT_AUDIENCE)
        return decoded_token
    except Exception:
        self.logger.exception("Failed to verify jwt")
        raise

This works very good for a while but all of a sudden, the signed JWT kid does not exist in https://www.googleapis.com/robot/v1/metadata/x509/my-service@**-**.iam.gserviceaccount.com (which is fetched using the _get_certificates method) and therefore we cannot verify the JWT token any more and it becomes "useless".

The are a few questions to ask here:

  1. Why did those keys in https://www.googleapis.com/robot/v1/metadata/x509/my-service@**-**.iam.gserviceaccount.com rotate?
  2. When does google rotate managed service account keys?
  3. How can we overcome this problem?
  4. Where can I find documentation about this?

Thanks in advance!


Solution

  • Google Cloud rotates Google-managed certificate keys about every 12 hours.

    If the elapsed time from signing to validating is more than 12 hours, that might be your problem.

    The only public reference that I am aware of is here

    That link is for Google Cloud Storage, but the key rotation detail applies. SignBlob and SignJwt are basically the same API service.

    Solution:

    You can sign the JWT yourself using your service account's private key to not be subject to key rotation. All Python JWT libraries have that ability.

    Example:

    import jwt
    
    ''' Load the Google Service Account Credentials from Json file '''
    with open(filename, 'r') as f:
        data = f.read()
    json_cred = json.loads(data)
    pkey = json_cred['private_key']
    
    ...
    
    # Encode the headers and payload and sign creating a Signed JWT (JWS)
    signed_jwt = jwt.encode(payload, pkey, algorithm="RS256", headers=headers)