Search code examples
oauth-2.0jwtfastapiaccess-token

Token is not recognized after authentication in FastAPI


I am creating a basic Authentication register/login with FastAPI. However, after the user has succesfully registered and logged in, the token does not get recognized. It works fine using the "/docs" through Swagger UI, but not from the main app.

Here is my code: main.py

import uvicorn
from fastapi import Depends, HTTPException
from auth import AuthHandler
from fastapi import FastAPI, Request, Form
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates

app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")

auth_handler = AuthHandler()
users = []


@app.get('/', response_class=HTMLResponse)
def get_register_form(request: Request):
    return templates.TemplateResponse("register.html", {"request": request})


@app.post('/', response_class=HTMLResponse)
def register(request: Request, username: str = Form(...), password: str = Form(...)):
    if len(users) != 0:
        for x in users:
            if x['username'] == username:
                print('Username is taken!')
                raise HTTPException(status_code=400, detail='Username is taken!')
    hashed_password = auth_handler.get_password_hash(password)
    users.append({
        'username': username,
        'password': hashed_password
    })
    print('User:', username, 'registered!')
    return templates.TemplateResponse("success.html", {"request": request})


@app.get('/login', response_class=HTMLResponse)
def get_login_form(request: Request):
    return templates.TemplateResponse("login.html", {"request": request})


@app.post('/login')
def login(request: Request, username: str = Form(...), password: str = Form(...)):
    user = None
    for x in users:
        if x['username'] == username:
            user = x
            break
    if (user is None) or (not auth_handler.verify_password(password, user['password'])):
        print('Invalid username and/or password!')
        raise HTTPException(status_code=401, detail='Invalid username and/or password!')
    token = auth_handler.encode_token(user['username'])
    return {'token': token}


@app.get('/protected')
def protected(username=Depends(auth_handler.auth_wrapper)):
    return {'name': username}


if __name__ == '__main__':
    uvicorn.run(app)

Here is my code: auth.py

import jwt
from fastapi import HTTPException, Security
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from passlib.context import CryptContext
from datetime import datetime, timedelta


class AuthHandler():
    security = HTTPBearer()
    pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
    secret = 'SECRET'

    def get_password_hash(self, password):
        return self.pwd_context.hash(password)

    def verify_password(self, plain_password, hashed_password):
        return self.pwd_context.verify(plain_password, hashed_password)

    def encode_token(self, user_id):
        payload = {
            'exp': datetime.utcnow() + timedelta(days=0, minutes=5),
            'iat': datetime.utcnow(),
            'sub': user_id
        }
        return jwt.encode(
            payload,
            self.secret,
            algorithm='HS256'
        )

    def decode_token(self, token):
        try:
            payload = jwt.decode(token, self.secret, algorithms=['HS256'])
            return payload['sub']
        except jwt.ExpiredSignatureError:
            raise HTTPException(status_code=401, detail='Signature has expired')
        except jwt.InvalidTokenError as e:
            raise HTTPException(status_code=401, detail='Invalid token')

    def auth_wrapper(self, auth: HTTPAuthorizationCredentials = Security(security)):
        return self.decode_token(auth.credentials)

Here is my forms.html: register.html and login.html are the same.

<!doctype html>
<html lang="en">
<head>
    <link rel="stylesheet" href="../static/styles.css">
    <title>Document</title>
</head>
<body>
    <div id="form">
        <form method="post">
            <h3>Login</h3>
            <label for="username">Username:</label><br>
            <input type="text" name="username" id="username"><br>
            <label for="password">Password:</label><br>
            <input type="text" name="password" id="password"><br><br>
            <input type="submit" value="Submit" id="sub">
        </form>
    </div>
</body>
</html>

The error I get when going to 127.0.0.1/protected is:

{"detail":"Not authenticated"}

How can I fix this, so that it recognizes the token from the user just like in docs?


Solution

  • I found a solution on the advise from @MatsLindh, I also simplified a lot the code, imports, etc. You will obviously still need to have the html files with the form and fields you'll need. In my case I just added the email and password.

    Please note that this is not "best practice" especially inserting in the same table of the database the password even it it's hashed.

    import uvicorn
    import sqlite3
    import jwt
    from datetime import datetime, timedelta
    from fastapi import FastAPI, Form, HTTPException, Cookie, Request, Depends
    from fastapi.responses import HTMLResponse, RedirectResponse
    from fastapi.staticfiles import StaticFiles
    from fastapi.templating import Jinja2Templates
    from passlib.context import CryptContext
    
    
    connection = sqlite3.connect("users.db", check_same_thread=False)
    cursor = connection.cursor()
    
    app = FastAPI()
    app.mount("/static", StaticFiles(directory="static"), name="static")
    templates = Jinja2Templates(directory="templates")
    
    
    class AuthHandler:
        secret = 'SECRET'  # I can put any key I want, it's going to be used to encrypt and decrypt
        bcrypt_obj = CryptContext(schemes=["bcrypt"], deprecated="auto")
    
        def get_password_hash(self, password):
            return self.bcrypt_obj.hash(password)
    
        def verify_password(self, plain_password, hashed_password):
            return self.bcrypt_obj.verify(plain_password, hashed_password)
    
        def encode_token(self, user_id):
            payload = {
                'exp': datetime.utcnow() + timedelta(days=0, minutes=5),
                'iat': datetime.utcnow(),
                'sub': user_id
            }
            return jwt.encode(
                payload,
                self.secret,
                algorithm='HS256'
            )
    
        def decode_token(self, token):
            try:
                payload = jwt.decode(token, self.secret, algorithms=['HS256'])
                return payload['sub']
            except jwt.ExpiredSignatureError:
                raise HTTPException(status_code=401, detail='Signature has expired')
            except jwt.InvalidTokenError as e:
                raise HTTPException(status_code=401, detail='Invalid token')
    
        def grant_access(self, token, users_db):
            for x in users_db:
                if x['email'] == self.decode_token(token):
                    return True
                else:
                    return False
    
    
    auth_handler = AuthHandler()
    
    
    @app.get('/', response_class=HTMLResponse)
    def get_register_form(request: Request):
        return templates.TemplateResponse("register.html", {"request": request})
    
    
    @app.post('/', response_class=HTMLResponse)
    def register(email: str = Form(...), password: str = Form(...)):
    
        cursor.execute("select * from users where email=:e", {"e": email})
        user_with_same_email_list = cursor.fetchall()
        if len(user_with_same_email_list) != 0:
            print(user_with_same_email_list)
            print('Email already registered!')
            raise HTTPException(status_code=400, detail='Email already registered!')
        else:
            hashed_password = auth_handler.get_password_hash(password)
            sqlite_insert_query = "INSERT INTO users (user_id, username, email, hashed_password, eoa) VALUES " \
                                  "(1,'','" + email + "','" + hashed_password + "','')"
            cursor.execute(sqlite_insert_query)
            connection.commit()
            print('User with email:', email, 'registered!')
            response = RedirectResponse(url="/login")
            response.status_code = 302
            return response
    
    
    @app.get('/login', response_class=HTMLResponse)
    def get_login_form(request: Request):
        return templates.TemplateResponse("login.html", {"request": request})
    
    
    @app.post('/login')
    def login(email: str = Form(...), password: str = Form(...)):
        cursor.execute("select * from users where email=:e", {"e": email})
        user_with_same_email_list = cursor.fetchall()
        if len(user_with_same_email_list) == 0:
            print('No user with this email!')
            raise HTTPException(status_code=400, detail='No user with this email!')
        elif (email is None) or (not auth_handler.verify_password(password, user_with_same_email_list[0][3])):
            print('Invalid email and/or password!')
            raise HTTPException(status_code=401, detail='Invalid email and/or password!')
        else:
            token = auth_handler.encode_token(email)
            response = RedirectResponse(url="/check_cookie")
            response.status_code = 302
            response.set_cookie(key="Authorization", value=token, secure=True, httponly=True)
            return response
    
    
    @app.get("/check_cookie")
    async def check_cookie(Authorization: str | None = Cookie(None)):
        if Authorization:
            email = auth_handler.decode_token(Authorization)
            cursor.execute("select * from users where email=:e", {"e": email})
            user_with_same_email_list = cursor.fetchall()
            if len(user_with_same_email_list) == 0:
                print('Invalid token')
                raise HTTPException(status_code=401, detail='Invalid token')
            else:
                print('Access granted!')
                response = RedirectResponse(url="/protected")
                response.status_code = 302
                return response
        else:
            print("No token found")
            raise HTTPException(status_code=401, detail='No token found')
    
    
    @app.get('/protected', response_class=HTMLResponse)
    async def protected(request: Request, email=Depends(check_cookie)):
        return templates.TemplateResponse("logged_in.html", {"request": request})
    
    
    @app.get("/logged_out", response_class=HTMLResponse)
    async def logged_out(request: Request):
        return templates.TemplateResponse("logged_out.html", {"request": request})
    
    
    @app.get("/logout")
    async def route_logout_and_remove_cookie():
        response = RedirectResponse(url="/logged_out")
        response.delete_cookie("Authorization", domain="127.0.0.1")
        return response
    
    
    if __name__ == '__main__':
        uvicorn.run(app)