Search code examples
google-cloud-platformoauth-2.0google-oauthservice-accountsgoogle-iam

How to use OAuth 2.0 for Server to Server Applications


I would like to get access_token. I read the following URL and found that it needs to be signed by JWT.

i get json key like this.

{
  "type": "service_account",
  "project_id": "xxxxxxxxxxxxxxx",
  "private_key_id": "xxxxxxxxxxxxxxxxxxx",
  "private_key": "-----BEGIN PRIVATE KEY-----\nxxxxxx\n-----END PRIVATE KEY-----\n",
  "client_email": "[email protected]",
  "client_id": "xxxxxxxxxxx",
  "auth_uri": "https://accounts.google.com/o/oauth2/auth",
  "token_uri": "https://oauth2.googleapis.com/token",
  "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
  "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/xxxxxxxxxxx.iam.gserviceaccount.com"
}

But RSA256 signing requires a public key, where can I get it? I want to verify with JWT.io, but result is Invalid Signature. The following, which was obtained by accessing client_x509_cert_url, was useless.

-----BEGIN CERTIFICATE-----\nxxxxxxxxx\n-----END CERTIFICATE-----\n"

https://developers.google.com/identity/protocols/oauth2/service-account#httprest


Solution

  • The field client_x509_cert_url contains a URL which contains multiple certificates. Each certificate contains a public key. Select the correct certificate based on the private_key_id. Then extract the public key.

    I wrote the following code in 2018 to demonstrate how to create a Signed JWT using a service account and then verify the Signed JWT using Google public certificates. This example supports the Python OpenSSL and Cryptography libraries.

    Once you have created a Signed JWT, you must exchange that for an Access Token. My website has articles that detail that step also.

    '''
    This program creates and verifies a Signed JWT using the public certificate
    '''
    
    import time
    import json
    import base64
    import jwt
    import requests
    
    # This example supports both libraries. Only one is required.
    
    import OpenSSL.crypto
    
    from cryptography.x509 import load_pem_x509_certificate
    from cryptography.hazmat.backends import default_backend
    from cryptography.hazmat.primitives import serialization
    
    
    use_pyopenssl = True
    # use_pyopenssl = False
    
    json_filename = 'service-account.json'
    
    # Google Endpoint for creating OAuth 2.0 Access Tokens from Signed-JWT
    auth_url = "https://www.googleapis.com/oauth2/v4/token"
    
    # Set how long this token will be valid in seconds
    expires_in = 3600   # Expires in 1 hour
    
    scopes = "https://www.googleapis.com/auth/cloud-platform"
    
    # You can control what is verified in the JWT. For example to allow expired JWTs
    # set 'verify_exp' to False
    options = {
        'verify_signature': True,
        'verify_exp': True,
        'verify_nbf': True,
        'verify_iat': True,
        'verify_aud': True,
        'require_exp': False,
        'require_iat': False,
        'require_nbf': False
    }
    
    aud = 'https://www.googleapis.com/oauth2/v4/token'
    
    def load_private_key(json_cred):
        ''' Return the private key from the json credentials '''
    
        return json_cred['private_key']
    
    def load_public_key(cert):
        ''' Extract the public key from the certificate '''
    
        if use_pyopenssl:
            obj = OpenSSL.crypto.load_certificate(
                        OpenSSL.crypto.FILETYPE_PEM,
                        cert)
    
            pub_key = OpenSSL.crypto.dump_publickey(
                        OpenSSL.crypto.FILETYPE_PEM,
                        obj.get_pubkey())
    
            # print('Public Key (pyOpenSSL)')
            # print(pub_key)
    
            return pub_key
    
        # print('Load certificate')
        cert_obj = load_pem_x509_certificate(cert.encode('utf-8'), default_backend())
    
        # print('Get Public Key')
        pub_obj = cert_obj.public_key()
    
        # print(pub_obj)
    
        pub_key = pub_obj.public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo)
    
        # print('Public Key (cryptography)')
        # print(pub_key)
    
        return pub_key
    
    def load_json_credentials(filename):
        ''' Load the Google Service Account Credentials from Json file '''
    
        # print('Opening:', filename)
    
        with open(filename, 'r') as f:
            data = f.read()
    
        return json.loads(data)
    
    def load_public_certificates(url):
        ''' Load the public certificates for the client email address '''
    
        r = requests.get(url)
    
        if r.status_code != 200:
            return None
    
        return json.loads(r.content.decode('utf-8'))
    
    def create_signed_jwt(pkey, pkey_id, email, scope):
        ''' Create an AccessToken from a service account Json credentials file '''
    
        issued = int(time.time())
        expires = issued + expires_in   # expires_in is in seconds
    
        # Note: this token expires and cannot be refreshed. The token must be recreated
    
        # JWT Headers
        additional_headers = {
                'kid': pkey_id,
                "alg": "RS256",
                "typ": "JWT"    # Google uses SHA256withRSA
        }
    
        # JWT Payload
        payload = {
            "iss": email,       # Issuer claim
            "sub": email,       # Issuer claim
            "aud": auth_url,    # Audience claim
            "iat": issued,      # Issued At claim
            "exp": expires,     # Expire time
            "scope": scope      # Permissions
        }
    
        # Encode the headers and payload and sign creating a Signed JWT (JWS)
        sig = jwt.encode(payload, pkey, algorithm="RS256", headers=additional_headers)
    
        # print(sig)
    
        return sig
    
    def pad(data):
        """ pad base64 string """
    
        missing_padding = len(data) % 4
        data += '=' * (4 - missing_padding)
        return data
    
    def print_jwt(signed_jwt):
        """ Print a JWT Header and Payload """
    
        s = signed_jwt.decode('utf-8').split('.')
    
        print('Header:')
        h = base64.urlsafe_b64decode(pad(s[0])).decode('utf-8')
        print(json.dumps(json.loads(h), indent=4))
    
        print('Payload:')
        p = base64.urlsafe_b64decode(pad(s[1])).decode('utf-8')
        print(json.dumps(json.loads(p), indent=4))
    
    def verify_signed_jwt(signed_jwt, pub_key):
        '''
        This function takes a Signed JWT and verifies it using a Google Json service account.
        '''
    
        # Verify the Signed JWT
        r = jwt.decode(signed_jwt, pub_key, algorithms=["RS256"], audience=aud, options=options)
    
        print('Decoded JWT:')
        print(json.dumps(r, indent=4))
    
    def get_public_key(json_cred):
        '''
        Load the public certificates for the service account email address.
        Then compare the private_key_id to find the correct certificate.
        '''
    
        certs = load_public_certificates(json_cred['client_x509_cert_url'])
    
        for k in certs:
            if k == json_cred['private_key_id']:
                cert = certs[k]
                pub = load_public_key(cert)
    
                return pub
    
        return None
    
    if __name__ == '__main__':
        cred = load_json_credentials(json_filename)
    
        pub_certs = load_public_certificates(cred['client_x509_cert_url'])
    
        private_key = load_private_key(cred)
    
        # print('Private Key:')
        # print(private_key)
    
        public_key = get_public_key(cred)
    
        # print('Public Key:')
        # print(public_key)
    
        if public_key is None:
            print('Error: Cannot get public key')
            exit(1)
    
        s_jwt = create_signed_jwt(
                private_key,
                cred['private_key_id'],
                cred['client_email'],
                scopes)
    
        print_jwt(s_jwt)
    
        verify_signed_jwt(s_jwt, public_key)