I'm new with fastapi security and I'm trying to implement the authentication thing and then use scopes.
The problem is that I'm setting an expiration time for the token but after the expiration time the user still authenticated and can access services
import json
from jose import jwt,JWTError
from typing import Optional
from datetime import datetime,timedelta
from fastapi.security import OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes
from fastapi import APIRouter, UploadFile, File, Depends, HTTPException,status
from tinydb import TinyDB,where
from tinydb import Query
from passlib.hash import bcrypt
from pydantic import BaseModel
from passlib.context import CryptContext
##
class TokenData(BaseModel):
username: Optional[str] = None
class Token(BaseModel):
access_token: str
token_type: str
router = APIRouter()
SECRET_KEY="e79b2a1eaa2b801bc81c49127ca4607749cc2629f73518194f528fc5c8491713"
ALGORITHM="HS256"
ACCESS_TOKEN_EXPIRE_MINUTES=1
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/dev-service/api/v1/openvpn/token")
db=TinyDB('app/Users.json')
Users = db.table('User')
User = Query
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class User(BaseModel):
username: str
password:str
def get_user(username: str):#still
user= Users.search((where('name') ==name))
if user:
return user[0]
@router.post('/verif')
async def verify_user(name,password):
user = Users.search((where('name') ==name))
print(user)
if not user:
return False
print(user)
passw=user[0]['password']
if not bcrypt.verify(password,passw):
return False
return user
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=1)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
@router.post("/token", response_model=Token)
async def token_generate(form_data:OAuth2PasswordRequestForm=Depends()):
user=await verify_user(form_data.username,form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(data={"sub": form_data.username}, expires_delta=access_token_expires)
return {"access_token": access_token, "token_type": "bearer"}
@router.get('/user/me')
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user =Users.search(where('name') ==token_data.username)
if user is None:
raise credentials_exception
return user
@router.post('/user')
async def create_user(name,password):
Users.insert({'name':name,'password':bcrypt.hash(password)})
return True
How can I really see the expiration of the token and how can I add the scopes?
I had pretty much the same confusion when I started out with FastAPI. The access token you created will not expire on its own, so you will need to check if it is expired while validating the token at get_current_user
. You could modify your TokenData
schema to the code below:
class TokenData(BaseModel):
username: Optional[str] = None
expires: Optional[datetime]
And your get_current_user
to:
@router.get('/user/me')
async def get_current_user(token: str = Depends(oauth2_scheme)):
# get the current user from auth token
# define credential exception
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
# decode token and extract username and expires data
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
expires = payload.get("exp")
except JWTError:
raise credentials_exception
# validate username
if username is None:
raise credentials_exception
token_data = TokenData(username=username, expires=expires)
user = Users.search(where('name') == token_data.username)
if user is None:
raise credentials_exception
# check token expiration
if expires is None:
raise credentials_exception
if datetime.utcnow() > token_data.expires:
raise credentials_exception
return user
Scopes is a huge topic on its own, so I can't cover it here. However, you can read more on it at the fastAPI docs here