Search code examples
python-3.xpostgresqlflask-sqlalchemypython-multiprocessingpython-multithreading

Python API: synchronous multithreaded DB queries


I have a Python flask API that apply some SQL based filtering on an object.

Steps of the API workflow:

  • receive a POST request (with arguments)
  • run multiple SQL read queries (against a postgres DB) depending on some of the posted arguments
  • apply some simple "pure python" rules on the SQL results to get a boolean result
  • store the boolean result and the associated posted arguments in the postgres DB
  • return the boolean result

Contraints of the API:

  • The API needs to return the boolean answer under 150ms
  • I can store the boolean result asynchronously in DB to avoid waiting for the write query to complete before returning the boolean result
  • However and as explained, the boolean answer depends on the SQL read queries so I cannot run those queries asynchronously

Test made: While making some tests, I saw that I can make read queries in parallel. The test I did was:

  • Running the query below 2 times not using multithreading => the code ran in roughly 10 seconds
from sqlalchemy import create_engine
import os
import time


engine = create_engine(
    os.getenv("POSTGRES_URL")
)

def run_query():
    with engine.connect() as conn:
        rs = conn.execute(f"""
            SELECT
                * 
                , pg_sleep(5)
            FROM users
        """)

        for row in rs:
            print(row)

if __name__ == "__main__":
    start = time.time()
    for i in range(5):
        run_query()

    end = time.time() - start
  • Running the query using multithreading => the code ran in roughly 5 seconds
from sqlalchemy import create_engine
import os
import threading
import time

engine = create_engine(
    os.getenv("POSTGRES_URL")
)

def run_query():
    with engine.connect() as conn:
        rs = conn.execute(f"""
            SELECT
                * 
                , pg_sleep(5)
            FROM users
        """)

        for row in rs:
            print(row)

if __name__ == "__main__":
    start = time.time()
    threads = []
    for i in range(5):
        t = threading.Thread(target=run_query)
        t.start()
        threads.append(t)

    for t in threads:
        t.join()
    end = time.time() - start

Question:

  • What is the bottleneck of the code ? I'm sure there must be a maximum number of read queries that I can run in parallel in 1 API call. However I'm wondering what is determining these limit.

Thank you very much for your help !


Solution

  • This scales well beyond the point that is sensible. With some tweaks to the built in connection pool's pool_size, you could easily have 100 pg_sleep going simultaneously. But as soon as you change that to do real work rather than just sleeping, it would fall apart. You only have so many CPU and so many disk drives, and that number is probably way less than 100.

    You should start by looking at those read queries to see why they are slow and if they can't be made faster with indices or something.