Search code examples
python-3.xpostgresqlencryptionsqlalchemypgcrypto

Python SQLALchemy use pgp_sym_decrypt for columns instead of pgp_sym_encrypt for variable in the where clause of the compiled query


I use class model for each table with method for every sql operation with SQLAlchemy.

I install pgp_sym_encrypt and pgp_sym_decrypt with the extension of PostgreSQL

CREATE EXTENSION IF NOT EXISTS pgcrypto WITH SCHEMA my_database;

After with the help of SQLAlchemy documentation I use the my class User, I encrypt the data with the help of pgp_sym_encrypt, the function of the package pgcrypto of PostgreSQL (I use the version 13.4)

For security, I encrypt the data of the user iI would like to use the native pgp_sym_encrypt and pgp_sym_decrypt native function of postgreSQL. Before I used plain text query to do that with psycopg2. Since I use the SQLAlchemy ORM, through it Flask-SQLALchemy extension.

I found the solution with : https://docs.sqlalchemy.org/en/14/core/custom_types.html#applying-sql-level-bind-result-processing

But None of them show example to use them within a query

Actually my code is as following :

import uuid
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import Column, String, DateTime, func, type_coerce, TypeDecorator
from sqlalchemy.dialects.postgresql import UUID, BYTEA
from instance.config import ENCRYPTION_KEY

db = SQLAlchemy()


class PGPString(TypeDecorator):
    impl = BYTEA

    cache_ok = True

    def __init__(self, passphrase):
        super(PGPString, self).__init__()
        self.passphrase = passphrase

    def bind_expression(self, bindvalue):
        # convert the bind's type from PGPString to
        # String, so that it's passed to psycopg2 as is without
        # a dbapi.Binary wrapper
        bindvalue = type_coerce(bindvalue, String)
        return func.pgp_sym_encrypt(bindvalue, self.passphrase)

    def column_expression(self, col):
        return func.pgp_sym_decrypt(col, self.passphrase)
    
    
class User(db.Model):
    __tablename__ = "user"
    __table_args__ = {'schema': 'private'}
    id = Column('id', UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    sexe = Column("sexe", String)
    username = Column("username", PGPString(ENCRYPTION_KEY), unique=True)
    firstname = Column("firstname", PGPString(ENCRYPTION_KEY), unique=True)
    lastname = Column("lastname", PGPString(ENCRYPTION_KEY), unique=True)
    email = Column("email", PGPString(ENCRYPTION_KEY), unique=True)
    password = Column("password", PGPString(ENCRYPTION_KEY), unique=True)
    registration_date = Column("registration_date", DateTime)
    validation_date = Column("validation_date", DateTime)
    role = Column("role", String)

    @classmethod
    def create_user(cls, **kw):
        """
        Create an User
        """
        obj = cls(**kw)
        db.session.add(obj)
        db.session.commit()

Everything worked fine, when I use my create_user method, the user data have his data encrypted.

But if in my class I add a method to check if the user exist, by checking if the username, email, or other data by this method :

class User(db.Model)
    ....
    @classmethod
    def check_user_exist(cls, **kwargs):
        """
        Check if user exist
        :param kwargs:
        :return:
        """
        exists = db.session.query(cls).with_entities(cls.username).filter_by(**kwargs).first() is not None
        return exists

this will produce the bellow query :

SELECT private."user".id AS private_user_id,
       private."user".sexe AS private_user_sexe,
       pgp_sym_decrypt(private."user".username, ENCRYPTION_KEY)  AS private_user_username,
       pgp_sym_decrypt(private."user".firstname, ENCRYPTION_KEY) AS private_user_firstname,
       pgp_sym_decrypt(private."user".lastname, ENCRYPTION_KEY)  AS private_user_lastname,
       pgp_sym_decrypt(private."user".email, ENCRYPTION_KEY)     AS private_user_email,
       pgp_sym_decrypt(private."user".password, ENCRYPTION_KEY)  AS private_user_password,
       private."user".registration_date AS private_user_registration_date,
       private."user".validation_date   AS private_user_validation_date,
       private."user".role              AS private_user_role

FROM private."user";

WHERE private."user".username = pgp_sym_encrypt(username, ENCRYPTION_KEY)
  AND private."user".email = pgp_sym_encrypt(email, ENCRYPTION_KEY)
  AND private."user".password = pgp_sym_encrypt(password, ENCRYPTION_KEY);

This query return None because when we use pgp_sym_encrypt with the actual ENCRYPTION_KEY, the data will be encrypted in a new way.

https://www.postgresql.org/docs/13/pgcrypto.html#id-1.11.7.34.8 https://en.wikipedia.org/wiki/Pretty_Good_Privacy#/media/File:PGP_diagram.svg

So how to modify the SQLAlchemy documentation function PGPString(TypeDecorator): for using pgp_sym_decrypt to have the columns decrypted and not the variable encrypted as the query below:

SELECT private."user".id AS private_user_id,
       private."user".sexe AS private_user_sexe,
       pgp_sym_decrypt(private."user".username, ENCRYPTION_KEY)  AS private_user_username,
       pgp_sym_decrypt(private."user".firstname, ENCRYPTION_KEY) AS private_user_firstname,
       pgp_sym_decrypt(private."user".lastname, ENCRYPTION_KEY)  AS private_user_lastname,
       pgp_sym_decrypt(private."user".email, ENCRYPTION_KEY)     AS private_user_email,
       pgp_sym_decrypt(private."user".password, ENCRYPTION_KEY)  AS private_user_password,
       private."user".registration_date AS private_user_registration_date,
       private."user".validation_date   AS private_user_validation_date,
       private."user".role              AS private_user_role
FROM private."user"
WHERE
      pgp_sym_decrypt(private."user".username, ENCRYPTION_KEY) = 'username'
    AND pgp_sym_decrypt(private."user".email, ENCRYPTION_KEY)  = 'email'
    AND   pgp_sym_decrypt(private."user".password, ENCRYPTION_KEY) = 'password';

Usually I do well with Python, and SQLAlchemy for the moment but this is really beyond my skills, and I don't really know how to modify this function since it affects functionality that is completely unknown to me.

for the moment Iuse the Text function, but this is uggly

for example for an another method to get the id from the username (encrypted data)

    @classmethod
    def get_id(cls, username):
        query = """
        SELECT private.user.id
        FROM private."user"
        WHERE pgp_sym_decrypt(private."user".username, :ENCRYPTION_KEY) = :username;
        """
        q = db.session.query(cls).from_statement(text(query)).params(ENCRYPTION_KEY=ENCRYPTION_KEY,
                                                                     username=username).first()
        return q.id

Thanks a lot for your help ! Best Regards


Solution

  • Solved by adding :

    class PGPString(TypeDecorator):
        (...)
        def pgp_sym_decrypt(self, col):
            return func.pgp_sym_decrypt(col, self.passphrase, type_=String)
    

    to PGPString and use like :

    class User(db.session):
        (...)
        @classmethod
        def check_user_exist(cls, **kw):
            exists = db.session.query(cls).with_entities(cls.username)\
                .filter(func.pgp_sym_decrypt(User.username, ENCRYPTION_KEY, type_=String) == kw['username'])\
                .filter(func.pgp_sym_decrypt(User.email, ENCRYPTION_KEY, type_=String) == kw['email'])\
                .filter(func.pgp_sym_decrypt(User.password, ENCRYPTION_KEY, type_=String) == kw['password'])\
                .first() is not None
        return exist
    
    

    thanks to sqlalchemy team: https://github.com/sqlalchemy/sqlalchemy/discussions/7268