Search code examples
python-3.xflask-sqlalchemyflask-restfulflask-httpauth

Verify Basic Auth has the same Username as in URL [Flask]


Basically I am doing a hybrid of sending the username in the url and in Basic Auth when I need the user to be authenticated.

My full code is:

from flask import Flask, request, send_from_directory, g
from flask_restful import Resource, Api
from flask_httpauth import HTTPBasicAuth
from sqlalchemy import create_engine, Column, Integer, String, Boolean
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from passlib.apps import custom_app_context as pwd_context
import pathlib
import shutil
import os

SUCCESS = 200
UNAUTHORIZED = 401
BAD_REQUEST = 402
INTERNAL_ERROR = 500
INVALID_MEDIA = 415

db_engine = create_engine('mysql://root:mypasswordisembarrassing@localhost/store', echo=True)
Base = declarative_base()


class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    username = Column(String(16), index=True, unique=True, nullable=False)
    password_hash = Column(String(128), nullable=False)
    avatar = Column(Boolean, default=False, nullable=False)

    def hash_password(self, password):
        self.password_hash = pwd_context.encrypt(password)

    def verify_password(self, password):
        return pwd_context.verify(password, self.password_hash)

    def __repr__(self):
        return "<User(username='%s', password='%s', avatar path='%s')>" % (
                            self.username, self.password_hash, self.avatar)

Base.metadata.create_all(db_engine)

Session = sessionmaker(bind=db_engine)
session = Session()

app = Flask(__name__)
app.secret_key = 'the quick brown fox jumps over the lazy dog'
api = Api(app) #blueprint?
auth = HTTPBasicAuth()


@auth.verify_password
def verify_password(username, password):
    u = session.query(User).filter_by(username=username).first()
    if not u or not u.verify_password(password):
        return False
    g.user = u
    return True


class Avatar(Resource):
    def get(self, name):
        u = session.query(User).filter_by(username=name).first()
        assert u.avatar is not None
        if u.avatar:
            send_from_directory('static/u/%s/avatar.png', name) #Aparently Flask should not do this? Apache...
        else:
            send_from_directory('static/defaults/u/avatar.png')
        return "Success", SUCCESS

    @auth.login_required
    def post(self, name):
        u = g.user
        if 'avatar' not in request.files:
            return "No file recieved", BAD_REQUEST
        file = request.files['avatar']
        if file.content_type != 'image/png':
            return "Avatar must be a png", INVALID_MEDIA
        assert pathlib.Path("static").exists()
        pathlib.Path("static/u/%s" % u.username).mkdir(parents=True, exist_ok=True)
        file.save('static/u/%s/avatar.png' % u.username)
        u.avatar = True
        session.commit()
        return "Success", SUCCESS


class Player(Resource):
    def post(self, name):
        password = request.json.get('password')
        if not valid_password(password):
            return "Password needs to be longer than 5 characters", BAD_REQUEST
        u = User(username=name)
        u.hash_password(password)
        session.add(u)
        session.commit()
        return "Success", SUCCESS

    @auth.login_required
    def delete(self, name):
        doomed_user = g.user
        shutil.rmtree('static/u/%s' % doomed_user.username)
        session.delete(doomed_user)
        session.commit()
        return "Success", SUCCESS


def valid_password(password):
    return len(password) >= 6

api.add_resource(Avatar, '/u/<string:name>/avatar.png')
api.add_resource(Player, '/u/<string:name>')

if __name__ == '__main__':
    app.run()

I am concerned about this part:

class Avatar(Resource):
    def get(self, name):
        u = session.query(User).filter_by(username=name).first()
        assert u.avatar is not None
        if u.avatar:
            send_from_directory('static/u/%s/avatar.png', name) #Aparently Flask should not do this? Apache...
        else:
            send_from_directory('static/defaults/u/avatar.png')
        return "Success", SUCCESS

    @auth.login_required
    def post(self, name):
        u = g.user
        if 'avatar' not in request.files:
            return "No file recieved", BAD_REQUEST
        file = request.files['avatar']
        if file.content_type != 'image/png':
            return "Avatar must be a png", INVALID_MEDIA
        assert pathlib.Path("static").exists()
        pathlib.Path("static/u/%s" % u.username).mkdir(parents=True, exist_ok=True)
        file.save('static/u/%s/avatar.png' % u.username)
        u.avatar = True
        session.commit()
        return "Success", SUCCESS

Where I would like to use the variable name when doing authorizing but if someone sends a request to someone else's url with their credentials they could change other user data, so for now I'm just using the authorized username in those functions.

For example: url/u/John -uBill:password should be marked invalid

Is there any neat way I can have the @login_required decorator check if the url is also valid? I would rather not do if name != u.username: ... at the beginning of every protected route.


Solution

  • You have two options.

    If you need to do this for every route, then you can add the username check to your verify_password callback function:

    @auth.verify_password
    def verify_password(username, password):
        if username != request.view_args.get('name'):
            return False
        u = session.query(User).filter_by(username=username).first()
        if not u or not u.verify_password(password):
            return False
        g.user = u
        return True
    

    If you need to add this check to a subset of your routes, then you can do this in a separate decorator, which would be something like this:

    from flask import request, abort
    from functools import wraps
    
    def check_username(f):
        @wraps(f)
        def wrapped(*args, **kwargs):
            if auth.username != request.view_args['name']:
                abort(401)
            return f(*args, **kwargs)
    

    Then you can add the decorator to any routes that need it:

    class Avatar(Resource):
        # ...
    
        @auth.login_required
        @check_username
        def post(self, name):
            # ...