Search code examples
pythonmysqlpython-multithreadingmysql-python

Python: Improving performance - Writing to database in seperate thread


I am running a python app where I for various reasons have to host my program on a server in one part of the world and then have my database in another.

I tested via a simple script, and from my home which is in a neighboring country to the database server, the time to write and retrieve a row from the database is about 0.035 seconds (which is a nice speed imo) compared to 0,16 seconds when my python server in the other end of the world performs same action. This is an issue as I am trying to keep my python app as fast as possible so I was wondering if there is a smart way to do this?

As I am running my code synchronously my program is waiting every time it has to write to the db, which is about 3 times a second so the time adds up. Is it possible to run the connection to the database in a separate thread or something, so it doesn't halt the whole program while it tries to send data to the database? Or can this be done using asyncio (I have no experience with async code)?

I am really struggling figuring out a good way to solve this issue. In advance, many thanks!


Solution

  • Yes, you can create a thread that does the writes in the background. In your case, it seems reasonable to have a queue where the main thread puts things to be written and the db thread gets and writes them. The queue can have a maximum depth so that when too much stuff is pending, the main thread waits. You could also do something different like drop things that happen too fast. Or, use a db with synchronization and write a local copy. You also may have an opportunity to speed up the writes a bit by committing multiple at once.

    This is a sketch of a worker thread

    import threading
    import queue
    
    class SqlWriterThread(threading.Thread):
    
        def __init__(self, db_connect_info, maxsize=8):
            super().__init__()
            self.db_connect_info = db_connect_info
            self.q = queue.Queue(maxsize)
            # TODO: Can expose q.put directly if you don't need to
            # intercept the call
            # self.put = q.put
            self.start()
    
        def put(self, statement):
            print(f"DEBUG: Putting\n{statement}")
            self.q.put(statement)
    
        def run(self):
            db_conn = None
            while True:
                # get all the statements you can, waiting on first
                statements = [self.q.get()]
                try:
                    while True:
                        statements.append(self.q.get(), block=False)
                except queue.Empty:
                    pass
                try:
                    # early exit before connecting if channel is closed.
                    if statements[0] is None:
                        return
                    if not db_conn:
                        db_conn = do_my_sql_connect()
                    try:
                        print("Debug: Executing\n", "--------\n".join(f"{id(s)} {s}" for s in statements))
                        # todo: need to detect closed connection, then reconnect and resart loop
                        cursor = db_conn.cursor()
                        for statement in statements:
                            if statement is None:
                                return
                            cursor.execute(*statement)
                    finally:
                        cursor.commit()       
                finally:
                    for _ in statements:
                        self.q.task_done()
    
    sql_writer = SqlWriterThread(('user', 'host', 'credentials'))
    sql_writer.put(('execute some stuff',))