I am trying to run multiple queries in parallel with PyGreSQL and multiprocessing, but below code hangs without returning:
from pg import DB
from multiprocessing import Pool
from functools import partial
def create_query(table_name):
return f"""create table {table_name} (id integer);
CREATE INDEX ON {table_name} USING BTREE (id);"""
my_queries = [ create_query('foo'), create_query('bar'), create_query('baz') ]
def execute_query(conn_string, query):
con = DB(conn_string)
con.query(query)
con.close()
rs_conn_string = "host=localhost port=5432 dbname=postgres user=postgres password="
pool = Pool(processes=len(my_queries))
pool.map(partial(execute_query,rs_conn_string), my_queries)
Is there any way to make it work? Also is it possible make the 3 running queries in same "transaction" in case one query fails and the other get rolled back?
One obvious problem is that you always run the pool.map
, not only in the main process, but also when the interpreters used in the parallel sub-processes import the script. You should do something like this instead:
def run_all():
with Pool(processes=len(my_queries)) as pool:
pool.map(partial(execute_query,rs_conn_string), my_queries)
if __name__ == '__main__':
run_all()
Regarding your second question, that's not possible since the transaction are per connection, which live in separate processes if you do it like that.
Asynchronous command processing might be what you want, but it is not yet supported by PyGreSQL. Psygopg + aiopg is probably better suited for doing things like that.