I would like to get access_token. I read the following URL and found that it needs to be signed by JWT.
i get json key like this.
{
"type": "service_account",
"project_id": "xxxxxxxxxxxxxxx",
"private_key_id": "xxxxxxxxxxxxxxxxxxx",
"private_key": "-----BEGIN PRIVATE KEY-----\nxxxxxx\n-----END PRIVATE KEY-----\n",
"client_email": "xxxxxxx@xxxxxxxxx.iam.gserviceaccount.com",
"client_id": "xxxxxxxxxxx",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/xxxxxxxxxxx.iam.gserviceaccount.com"
}
But RSA256 signing requires a public key, where can I get it? I want to verify with JWT.io, but result is Invalid Signature. The following, which was obtained by accessing client_x509_cert_url, was useless.
-----BEGIN CERTIFICATE-----\nxxxxxxxxx\n-----END CERTIFICATE-----\n"
https://developers.google.com/identity/protocols/oauth2/service-account#httprest
The field client_x509_cert_url
contains a URL which contains multiple certificates. Each certificate contains a public key. Select the correct certificate based on the private_key_id
. Then extract the public key.
I wrote the following code in 2018 to demonstrate how to create a Signed JWT using a service account and then verify the Signed JWT using Google public certificates. This example supports the Python OpenSSL and Cryptography libraries.
Once you have created a Signed JWT, you must exchange that for an Access Token. My website has articles that detail that step also.
'''
This program creates and verifies a Signed JWT using the public certificate
'''
import time
import json
import base64
import jwt
import requests
# This example supports both libraries. Only one is required.
import OpenSSL.crypto
from cryptography.x509 import load_pem_x509_certificate
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
use_pyopenssl = True
# use_pyopenssl = False
json_filename = 'service-account.json'
# Google Endpoint for creating OAuth 2.0 Access Tokens from Signed-JWT
auth_url = "https://www.googleapis.com/oauth2/v4/token"
# Set how long this token will be valid in seconds
expires_in = 3600 # Expires in 1 hour
scopes = "https://www.googleapis.com/auth/cloud-platform"
# You can control what is verified in the JWT. For example to allow expired JWTs
# set 'verify_exp' to False
options = {
'verify_signature': True,
'verify_exp': True,
'verify_nbf': True,
'verify_iat': True,
'verify_aud': True,
'require_exp': False,
'require_iat': False,
'require_nbf': False
}
aud = 'https://www.googleapis.com/oauth2/v4/token'
def load_private_key(json_cred):
''' Return the private key from the json credentials '''
return json_cred['private_key']
def load_public_key(cert):
''' Extract the public key from the certificate '''
if use_pyopenssl:
obj = OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM,
cert)
pub_key = OpenSSL.crypto.dump_publickey(
OpenSSL.crypto.FILETYPE_PEM,
obj.get_pubkey())
# print('Public Key (pyOpenSSL)')
# print(pub_key)
return pub_key
# print('Load certificate')
cert_obj = load_pem_x509_certificate(cert.encode('utf-8'), default_backend())
# print('Get Public Key')
pub_obj = cert_obj.public_key()
# print(pub_obj)
pub_key = pub_obj.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo)
# print('Public Key (cryptography)')
# print(pub_key)
return pub_key
def load_json_credentials(filename):
''' Load the Google Service Account Credentials from Json file '''
# print('Opening:', filename)
with open(filename, 'r') as f:
data = f.read()
return json.loads(data)
def load_public_certificates(url):
''' Load the public certificates for the client email address '''
r = requests.get(url)
if r.status_code != 200:
return None
return json.loads(r.content.decode('utf-8'))
def create_signed_jwt(pkey, pkey_id, email, scope):
''' Create an AccessToken from a service account Json credentials file '''
issued = int(time.time())
expires = issued + expires_in # expires_in is in seconds
# Note: this token expires and cannot be refreshed. The token must be recreated
# JWT Headers
additional_headers = {
'kid': pkey_id,
"alg": "RS256",
"typ": "JWT" # Google uses SHA256withRSA
}
# JWT Payload
payload = {
"iss": email, # Issuer claim
"sub": email, # Issuer claim
"aud": auth_url, # Audience claim
"iat": issued, # Issued At claim
"exp": expires, # Expire time
"scope": scope # Permissions
}
# Encode the headers and payload and sign creating a Signed JWT (JWS)
sig = jwt.encode(payload, pkey, algorithm="RS256", headers=additional_headers)
# print(sig)
return sig
def pad(data):
""" pad base64 string """
missing_padding = len(data) % 4
data += '=' * (4 - missing_padding)
return data
def print_jwt(signed_jwt):
""" Print a JWT Header and Payload """
s = signed_jwt.decode('utf-8').split('.')
print('Header:')
h = base64.urlsafe_b64decode(pad(s[0])).decode('utf-8')
print(json.dumps(json.loads(h), indent=4))
print('Payload:')
p = base64.urlsafe_b64decode(pad(s[1])).decode('utf-8')
print(json.dumps(json.loads(p), indent=4))
def verify_signed_jwt(signed_jwt, pub_key):
'''
This function takes a Signed JWT and verifies it using a Google Json service account.
'''
# Verify the Signed JWT
r = jwt.decode(signed_jwt, pub_key, algorithms=["RS256"], audience=aud, options=options)
print('Decoded JWT:')
print(json.dumps(r, indent=4))
def get_public_key(json_cred):
'''
Load the public certificates for the service account email address.
Then compare the private_key_id to find the correct certificate.
'''
certs = load_public_certificates(json_cred['client_x509_cert_url'])
for k in certs:
if k == json_cred['private_key_id']:
cert = certs[k]
pub = load_public_key(cert)
return pub
return None
if __name__ == '__main__':
cred = load_json_credentials(json_filename)
pub_certs = load_public_certificates(cred['client_x509_cert_url'])
private_key = load_private_key(cred)
# print('Private Key:')
# print(private_key)
public_key = get_public_key(cred)
# print('Public Key:')
# print(public_key)
if public_key is None:
print('Error: Cannot get public key')
exit(1)
s_jwt = create_signed_jwt(
private_key,
cred['private_key_id'],
cred['client_email'],
scopes)
print_jwt(s_jwt)
verify_signed_jwt(s_jwt, public_key)