Search code examples
pythonpostgresqlpsycopg2isolation-levelautocommit

Multiple Database connections using UPDATE ... RETURNING, seem to not update rows in tasks table


Preface

I want to process tasks listed in a database table in parallel. Not looking for working code.

The Setup

  • 1 PostgreSQL database server D
  • 1 processing server P
  • 1 User terminal T

using Python 3.6, psycopg2.7.6, PostgreSQL 11

D holds tables with data to be processed and a tasks table. A user at T ssh's into P, where the following command can be issued:

python -m core.utils.task

This task.py script is essentially a while loop that gets a task t from the tasks table on D with the status 'new' until there are no new tasks left. A task t is basically a set of arguments for another function called do_something(t). do_something(t) itself will make many connections to D to get data that needs to be processed and set task's to status 'done' once it finished – the while loop starts all over and gets a new task.

In order to run python -m core.utils.task multiple times I open multiple ssh connections. Not so good, I know; threading or multiprocessing would be better. But his is just for testing if I can run the mentioned command twice.

There is a script that manages all the database interactions called pgsql.py which is needed to get a task and then by do_something(t). I adapted a singleton pattern from this SE post.

Pseudo-Code (mostly)

task.py

import mymodule
import pgsql

def main():
    while True:
        r, c = pgsql.SQL.select_task()  # rows and columns
        task = dotdict(dict(zip(c, r[0])))
        mymodule.do_something(task)

if __name__ == "__main__":
    main()

mymodule.py

import pgsql

def do_something(t):
    input = pgsql.SQL.get_images(t.table,t.schema,t.image_id,t.image_directory)
    some_other_function(input)
    pgsql.SQL.task_status(t.task_id,'done')

pgsql.py

import psycopg2 as pg

class Postgres(object):
    """Adapted from https://softwareengineering.stackexchange.com/a/358061/348371"""
    _instance = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = object.__new__(cls)
            db_config = {'dbname': 'dev01', 'host': 'XXXXXXXX',
                         'password': 'YYYYY', 'port': 5432, 'user': 'admin'}
            try:
                print('connecting to PostgreSQL database...')
                connection = Postgres._instance.connection = pg.connect(**db_config)
                connection.set_session(isolation_level='READ COMMITTED', autocommit=True)
            except Exception as error:
                print('Error: connection not established {}'.format(error))
                Postgres._instance = None

            else:
                print('connection established')

        return cls._instance

    def __init__(self):
        self.connection = self._instance.connection

    def query(self, query):
        try:
            with self.connection.cursor() as cur:
                cur.execute(query)
                rows = cur.fetchall()
                cols = [desc[0] for desc in cur.description]
        except Exception as error:
            print('error execting query "{}", error: {}'.format(query, error))
            return None
        else:
            return rows, cols

    def __del__(self):
        self.connection.close()

db = Postgres()
class SQL():
    def select_task():
        s = """
            UPDATE schema.tasks
               SET status = 'ready'
             WHERE task_id = (  SELECT task_id
                                  FROM schema.tasks
                                 WHERE tasks.status = 'new'
                                 LIMIT 1)
            RETURNING *
            ;
            """.format(m=mode)
        return Postgres.query(db, s)


    def task_status(id,status):
        s = """
            UPDATE
                schema.tasks
            SET
                status = '{s}'
            WHERE
                tasks.task_id = '{id}'
            ;
            """.format(s=status,
                       id=id)
        return Postgres.query(db, s)

Problem

This works with one ssh connection. Tasks are retrieved from the database and processed, once finished the task is set to 'done'. Once I open a second ssh connection in a second terminal to run python -m core.utils.task (so to say, in parallel) the exact same rows of the tasks table are processed in both - ignoring that they have been updated.

Question

What are your suggestions to get this to work? There are millions of tasks and I need to run them in parallel. Before implementing threading or multiprocessing I wanted to test it with multiple ssh connections first, bad idea? I have fiddled around with the isolation levels and autocommit settings in psycopg2's set_session() but without luck. I checked the sessions in the Database server and can see that each process of python -m core.utils.task has its own PID, only connecting once, exactly like this singleton pattern should work. Any ideas or pointers how to deal with this are much appreciated!


Solution

  • The main problem is that performing one task is not an atomic operation. Therefore, in different ssh sessions, the same task can be processing several times.

    In this implementation, you can try to use an "INPROGRESS" status for task so as not to retrieve tasks that are already being processed (with "INPROGRESS" status). But be sure to use autocommit.

    But I would implement this using threads and database connection pool. And would extract tasks in batches using OFFSET and LIMIT. The do_something, select_task and task_status functions would implement for batch of tasks.

    Also, there is no need to implement the Postgres class as a singleton.


    Amended (see the comments below)

    • You can add FOR UPDATE SKIP LOCKED to the SQL query in current implementation (see url).
    • If you want to work with batches, then separate the data by some serial column (well, or just sort the data in a table).
    • My implementation using batches.
    • This can be implemented using ThreadPoolExecutor and PersistentConnectionPool.