I am using Google OAuth2 flow as described here: https://developers.google.com/identity/protocols/oauth2/web-server.
First I check if user credentials are already in a DB, if not the auth flow is initiated. After exchanging code to an access token, credentials are saved in a DB. There along with an access token and a refresh token a token expiration dateTime is saved, which is 1 hour (credentials.expiry).
Then I wanted to check how the access token would be refreshed. So I explicitly requested API after the token expiration time. The previously saved credentials are in the DB, so I get them from there. Its validity is checked, and the access token should be updated. However, by some reason, the token is valid and by making a test request to the Google API I am kinda can make requests, no error is raised.
I cannot get why token is valid, when it obviously is expired. I also checked the token validity here: https://www.googleapis.com/oauth2/v3/tokeninfo?access_token=AT and got { "error_description": "Invalid Value" }.
My code:
def google_oauth_init_flow(request):
# get user id
user_id = request.GET.get('user_id', None)
print(user_id)
user = User.objects.get(id=user_id)
# set id to a state parameter
request.session['state'] = str(user.id)
flow = google_auth_oauthlib.flow.Flow.from_client_secrets_file(
os.path.abspath(os.path.join(os.path.dirname(__file__), KEY_FILE_LOCATION)),
scopes=SCOPES)
flow.redirect_uri = REDIRECT_URI
authorization_url, state = flow.authorization_url(
# Enable offline access so that you can refresh an access token without
# re-prompting the user for permission. Recommended for web server apps.
access_type='offline',
# Enable incremental authorization. Recommended as a best practice.
include_granted_scopes='true',
state=str(user.id)
)
return HttpResponseRedirect(authorization_url)
def google_oauth_exchange_token(request):
state = request.session.get('state', 'No state')
print(state)
flow = google_auth_oauthlib.flow.Flow.from_client_secrets_file(
os.path.abspath(os.path.join(os.path.dirname(__file__), KEY_FILE_LOCATION)),
scopes=SCOPES,
state=state)
flow.redirect_uri = REDIRECT_URI
authorization_response = request.build_absolute_uri()
flow.fetch_token(authorization_response=authorization_response)
credentials = flow.credentials
# save creds in DB
try:
save_credentials(credentials, state)
except IntegrityError as e:
if 'unique constraint' in e.message:
# duplicate detected
return HttpResponse('Violation of unique constraint')
def google_oauth_check_token(credentials, user_id):
# convert dict into credentials instance
if type(credentials) == dict:
credentials = init_creds_instance(credentials)
if credentials.expired:
print('Token is expired getting new one...')
# refresh credentials
request = google.auth.transport.requests.Request()
credentials.refresh(request)
# alternative method
# credentials.refresh(httplib2.Http())
# update token in DB
SystemServiceTokens.objects.filter(id=user_id).update(name='token', key=credentials.token)
return credentials
else: # always Token is valid
print('TOKEN', credentials.token)
print('EXPIRY', credentials.expiry)
print('REFRESH TOKEN', credentials.refresh_token)
print('Token is valid')
return credentials
def get_user_info(credentials):
user_info = build(serviceName='oauth2', version='v2', credentials=credentials)
return user_info.userinfo().get().execute()
def save_credentials(credentials, state):
user = User.objects.get(id=state)
google_oauth_check_token(credentials, user.id)
# get email
user_info = get_user_info(credentials)
model = SystemServiceTokens()
# check duplicate values
if not model.objects.filter(user_id=user.id, email=user_info['email'], service_id='ga').exists():
print("Entry contained in queryset")
model.token = credentials.token
model.refresh_token = credentials.refresh_token
model.expires_at = datetime.datetime.strftime(credentials.expiry, '%Y-%m-%d %H:%M:%S')
model.user = user
model.email = user_info['email']
model.service_id = 'ga'
return model.save()
else:
return HttpResponse('Unique constraint violation')
# No errors
def test_request_google(request):
credentials = SystemServiceTokens.objects.filter(user_id=9).first() # DEBUG ID
print(model_to_dict(credentials))
credentials = google_oauth_check_token(model_to_dict(credentials), 9)
# test
user_info = build(serviceName='oauth2', version='v2', credentials=credentials)
print(user_info.userinfo().get().execute())
drive = build('drive', 'v2', credentials=credentials)
If the python client library can detect a refresh token its going to refresh it for you. Assuming your code works correctly you should never have an access token expire the library will refresh it five minutes before it is due to expire.
My Python isn't the best but its probably this in the source code. http.py#L1559