Search code examples
pythonmultiprocessingpsycopg2multiprocess

Can't pickle psycopg2.extensions.connection objects when using pool.imap, but can be done in individual processes


I am trying to build an application which will "check out" a cell, which is a square covering a part of land in a geographic database, and perform an analysis of the features within that cell. Since I have many cells to process, I am using a multiprocessing approach.

I had it somewhat working inside of my object like this:

class DistributedGeographicConstraintProcessor:

    ...

    def _process_cell(self, conn_string):

        conn = pg2.connect(conn_string)
        try:
            cur = conn.cursor()

            cell_id = self._check_out_cell(cur)
            conn.commit()
            print(f"processing cell_id {cell_id}...")

            for constraint in self.constraints:
                # print(f"processing {constraint.name()}...")
                query = constraint.prepare_distributed_query(self.job, self.grid)
                cur.execute(query, {
                    "buffer": constraint.buffer(),
                    "cell_id": cell_id,
                    "name": constraint.name(),
                    "simplify_tolerance": constraint.simplify_tolerance()
                })

            # TODO: do a final race condition check to further suppress duplicates
            self._check_in_cell(cur, cell_id)
            conn.commit()

        finally:
            del cur
            conn.close()

        return None

    def run(self):

        while True:
            if not self._job_finished():
                params = [self.conn_string] * self.num_cores
                processes = []
                for param in params:
                    process = mp.Process(target=self._process_cell, args=(param,))
                    processes.append(process)
                    sleep(0.1)  # Prevent multiple processes from checkout out the same grid square
                    process.start()
                for process in processes:
                    process.join()
            else:
                self._finalize_job()
                break

But the problem is that it will only start four processes and wait until they all finish before starting four new processes.

I want to make it so when one process finishes its work, it will begin working on the next cell immediately, even if its co-processes are not yet finished.

I am unsure about how to implement this and I have tried using a pool like this:

def run(self):

    pool = mp.Pool(self.num_cores)
    unprocessed_cells = self._unprocessed_cells()
    for i in pool.imap(self._process_cell, unprocessed_cells):
        print(i)

But this just tells me that the connection is not able to be pickled:

TypeError: can't pickle psycopg2.extensions.connection objects

But I do not understand why, because it is the exact same function that I am using in the imap function as in the Process target.

I have already looked at these threads, here is why they do not answer my question:


Solution

  • My guess is that you're attaching some connection object to self; try to rewrite your solution using functions only (no classes/methods).

    Here is a simplified version of a single producer/multiple workers solution I used some time ago:

    def worker(param):
        //connect to pg
        //do work
    
    
    def main():
        pool = Pool(processes=NUM_PROC)
        tasks = []
        for param in params:
            t = pool.apply_async(utils.process_month, args=(param, ))
            tasks.append(t)
        pool.close()
        finished = false
        while not finished:     
            finished = True
            for t in tasks:
                if not t.ready():
                    finished = False
                    break
            time.sleep(1)