I am creating a basic Authentication register/login with FastAPI. However, after the user has succesfully registered and logged in, the token does not get recognized. It works fine using the "/docs" through Swagger UI, but not from the main app.
Here is my code: main.py
import uvicorn
from fastapi import Depends, HTTPException
from auth import AuthHandler
from fastapi import FastAPI, Request, Form
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
auth_handler = AuthHandler()
users = []
@app.get('/', response_class=HTMLResponse)
def get_register_form(request: Request):
return templates.TemplateResponse("register.html", {"request": request})
@app.post('/', response_class=HTMLResponse)
def register(request: Request, username: str = Form(...), password: str = Form(...)):
if len(users) != 0:
for x in users:
if x['username'] == username:
print('Username is taken!')
raise HTTPException(status_code=400, detail='Username is taken!')
hashed_password = auth_handler.get_password_hash(password)
users.append({
'username': username,
'password': hashed_password
})
print('User:', username, 'registered!')
return templates.TemplateResponse("success.html", {"request": request})
@app.get('/login', response_class=HTMLResponse)
def get_login_form(request: Request):
return templates.TemplateResponse("login.html", {"request": request})
@app.post('/login')
def login(request: Request, username: str = Form(...), password: str = Form(...)):
user = None
for x in users:
if x['username'] == username:
user = x
break
if (user is None) or (not auth_handler.verify_password(password, user['password'])):
print('Invalid username and/or password!')
raise HTTPException(status_code=401, detail='Invalid username and/or password!')
token = auth_handler.encode_token(user['username'])
return {'token': token}
@app.get('/protected')
def protected(username=Depends(auth_handler.auth_wrapper)):
return {'name': username}
if __name__ == '__main__':
uvicorn.run(app)
Here is my code: auth.py
import jwt
from fastapi import HTTPException, Security
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from passlib.context import CryptContext
from datetime import datetime, timedelta
class AuthHandler():
security = HTTPBearer()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
secret = 'SECRET'
def get_password_hash(self, password):
return self.pwd_context.hash(password)
def verify_password(self, plain_password, hashed_password):
return self.pwd_context.verify(plain_password, hashed_password)
def encode_token(self, user_id):
payload = {
'exp': datetime.utcnow() + timedelta(days=0, minutes=5),
'iat': datetime.utcnow(),
'sub': user_id
}
return jwt.encode(
payload,
self.secret,
algorithm='HS256'
)
def decode_token(self, token):
try:
payload = jwt.decode(token, self.secret, algorithms=['HS256'])
return payload['sub']
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail='Signature has expired')
except jwt.InvalidTokenError as e:
raise HTTPException(status_code=401, detail='Invalid token')
def auth_wrapper(self, auth: HTTPAuthorizationCredentials = Security(security)):
return self.decode_token(auth.credentials)
Here is my forms.html: register.html and login.html are the same.
<!doctype html>
<html lang="en">
<head>
<link rel="stylesheet" href="../static/styles.css">
<title>Document</title>
</head>
<body>
<div id="form">
<form method="post">
<h3>Login</h3>
<label for="username">Username:</label><br>
<input type="text" name="username" id="username"><br>
<label for="password">Password:</label><br>
<input type="text" name="password" id="password"><br><br>
<input type="submit" value="Submit" id="sub">
</form>
</div>
</body>
</html>
The error I get when going to 127.0.0.1/protected is:
{"detail":"Not authenticated"}
How can I fix this, so that it recognizes the token from the user just like in docs?
I found a solution on the advise from @MatsLindh, I also simplified a lot the code, imports, etc. You will obviously still need to have the html files with the form and fields you'll need. In my case I just added the email and password.
Please note that this is not "best practice" especially inserting in the same table of the database the password even it it's hashed.
import uvicorn
import sqlite3
import jwt
from datetime import datetime, timedelta
from fastapi import FastAPI, Form, HTTPException, Cookie, Request, Depends
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from passlib.context import CryptContext
connection = sqlite3.connect("users.db", check_same_thread=False)
cursor = connection.cursor()
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
class AuthHandler:
secret = 'SECRET' # I can put any key I want, it's going to be used to encrypt and decrypt
bcrypt_obj = CryptContext(schemes=["bcrypt"], deprecated="auto")
def get_password_hash(self, password):
return self.bcrypt_obj.hash(password)
def verify_password(self, plain_password, hashed_password):
return self.bcrypt_obj.verify(plain_password, hashed_password)
def encode_token(self, user_id):
payload = {
'exp': datetime.utcnow() + timedelta(days=0, minutes=5),
'iat': datetime.utcnow(),
'sub': user_id
}
return jwt.encode(
payload,
self.secret,
algorithm='HS256'
)
def decode_token(self, token):
try:
payload = jwt.decode(token, self.secret, algorithms=['HS256'])
return payload['sub']
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail='Signature has expired')
except jwt.InvalidTokenError as e:
raise HTTPException(status_code=401, detail='Invalid token')
def grant_access(self, token, users_db):
for x in users_db:
if x['email'] == self.decode_token(token):
return True
else:
return False
auth_handler = AuthHandler()
@app.get('/', response_class=HTMLResponse)
def get_register_form(request: Request):
return templates.TemplateResponse("register.html", {"request": request})
@app.post('/', response_class=HTMLResponse)
def register(email: str = Form(...), password: str = Form(...)):
cursor.execute("select * from users where email=:e", {"e": email})
user_with_same_email_list = cursor.fetchall()
if len(user_with_same_email_list) != 0:
print(user_with_same_email_list)
print('Email already registered!')
raise HTTPException(status_code=400, detail='Email already registered!')
else:
hashed_password = auth_handler.get_password_hash(password)
sqlite_insert_query = "INSERT INTO users (user_id, username, email, hashed_password, eoa) VALUES " \
"(1,'','" + email + "','" + hashed_password + "','')"
cursor.execute(sqlite_insert_query)
connection.commit()
print('User with email:', email, 'registered!')
response = RedirectResponse(url="/login")
response.status_code = 302
return response
@app.get('/login', response_class=HTMLResponse)
def get_login_form(request: Request):
return templates.TemplateResponse("login.html", {"request": request})
@app.post('/login')
def login(email: str = Form(...), password: str = Form(...)):
cursor.execute("select * from users where email=:e", {"e": email})
user_with_same_email_list = cursor.fetchall()
if len(user_with_same_email_list) == 0:
print('No user with this email!')
raise HTTPException(status_code=400, detail='No user with this email!')
elif (email is None) or (not auth_handler.verify_password(password, user_with_same_email_list[0][3])):
print('Invalid email and/or password!')
raise HTTPException(status_code=401, detail='Invalid email and/or password!')
else:
token = auth_handler.encode_token(email)
response = RedirectResponse(url="/check_cookie")
response.status_code = 302
response.set_cookie(key="Authorization", value=token, secure=True, httponly=True)
return response
@app.get("/check_cookie")
async def check_cookie(Authorization: str | None = Cookie(None)):
if Authorization:
email = auth_handler.decode_token(Authorization)
cursor.execute("select * from users where email=:e", {"e": email})
user_with_same_email_list = cursor.fetchall()
if len(user_with_same_email_list) == 0:
print('Invalid token')
raise HTTPException(status_code=401, detail='Invalid token')
else:
print('Access granted!')
response = RedirectResponse(url="/protected")
response.status_code = 302
return response
else:
print("No token found")
raise HTTPException(status_code=401, detail='No token found')
@app.get('/protected', response_class=HTMLResponse)
async def protected(request: Request, email=Depends(check_cookie)):
return templates.TemplateResponse("logged_in.html", {"request": request})
@app.get("/logged_out", response_class=HTMLResponse)
async def logged_out(request: Request):
return templates.TemplateResponse("logged_out.html", {"request": request})
@app.get("/logout")
async def route_logout_and_remove_cookie():
response = RedirectResponse(url="/logged_out")
response.delete_cookie("Authorization", domain="127.0.0.1")
return response
if __name__ == '__main__':
uvicorn.run(app)