Search code examples
pythonpostgresqlparallel-processingmultiprocessingconnection-pooling

Multiprocessing and connection pool error: cannot pickle 'psycopg2.extensions.connection' object


I am trying to pass a db object that uses a connection pool in Postgres to another class object but I keep getting an error.

if __name__ == "__main__":
cores = calculate_number_of_cores()
pretty_print(f"Number of cores ==> {cores}")

db_manager = DatabaseManager()
with ProcessPoolExecutor(cores) as executor:
    while True:
        if LOCAL_RUN:
            pretty_print("ALERT: Doing a local run of the automation with limited capabilities.")

        list_of_clients = db_manager.get_clients()
        print(list_of_clients)
        random.shuffle(list_of_clients)

        list_of_not_attempted_clients_domains = db_manager.get_not_attempted_domains_that_matches_random_client_tags()
        group_of_clients_handlers = {}
        pretty_print(list_of_not_attempted_clients_domains)

        # no matches
        if not list_of_not_attempted_clients_domains:
            sleep = 60 * 10
            pretty_print(f'No matches found. Sleeping for {sleep}s')
            time.sleep(sleep)
            continue

        for client in list_of_clients:
            client_id = client[0]
            client_name = client[1]
            group_of_clients_handlers[client_id] = [ClientsHandler(db_manager), client_name]

        try:
            list(executor.map(
                partial(run, group_of_clients_handlers),
                list_of_not_attempted_clients_domains
            ))
            # for not_attempted_clients_domains in list_of_not_attempted_clients_domains:
            #     futures = executor.submit(run(group_of_clients_handlers, not_attempted_clients_domains))
            #     for future in concurrent.futures.as_completed(futures):
            #         pretty_print("Loading futures....")
            #         print(future.result())

            time.sleep(60)
            pretty_print("sleeping for 60secs")
        except Exception as err:
            pretty_print(err)

I am using a multiprocessing module in Python. My objective is to have ready connections that we use for every query to the database rather than having to always connect to the database from for every execution or process.

It's a must I pass the db_manager to the run function as I don't want to have to create a new connection there. But because of this, I am getting the error: cannot pickle 'psycopg2.extensions.connection' object.

I do not even know exactly what that means. The only thing I know is that when I submit the run function to the executor manually, the code works properly. That's the commented-out code.

I do not know any other way to navigate this. I kinda know where the error is coming from but how to solve it so it doesn't interfere with my setup is such a pain.


Solution

  • Error you get means that connection objects can't be serialized using pickle. Pickling of objects occurs on some platforms when you use multiprocessing to spawn child processes, you can read more about it in this answer.

    Pickling is not your main problem though. Even if you avoided using pickle while spawning child processes, sharing psycopg connections between processes wouldn't work. According to psycopg2 docs, you shouldn't share connections between processes.

    If you need pre-initialized connection pool and database operations are time-consuming, you should try using threading instead of multiprocessing, as according to psycopg2 docs (link above) connections are thread safe. With threads, you should be able to implement your program as desired, with pre-initialized connection pool in main thread and queries delegated to other threads using this one pool.