I am quite new to multiprocessing and threading. I am trying to come up with a solution where I have to execute multiple db2 queries in parallel, so that I can get the results fast.
I have come up with the following code:
import concurrent.futures
import multiprocessing
import os
import sys
import ibm_db
executed_queries = []
class QueryRunner:
def __init__(self, files):
self.files = files
def execute_query(self, filename):
with ibm_db.connect("DATABASE=sample;HOSTNAME=localhost;PORT=50000;USERNAME=db2admin;PASSWORD=db2admin") as conn:
with open(filename) as f:
query = f.read()
print(f"Executing query: {query}")
for i in range(3):
try:
ibm_db.exec_query(conn, query)
break
except Exception as e:
print(f"Query failed with error: {e}")
else:
# Query failed after 3 retries
print(f"Query failed after 3 retries")
# Check if the for loop executed any iterations
if not i:
# The for loop did not execute any iterations, so replace the query with a different query
filename = next((f for f in self.files if f not in executed_queries and f.startswith('MYqueries'))]
self.execute_query(filename)
def run(self):
with multiprocessing.Pool(processes=15) as pool:
results = pool.starmap(self.execute_query, [(filename,) for filename in filenames])
for result in results:
print(f"Query complete: {result}")
if __name__ == "__main__":
files = os.listdir('.')
filenames = [f for f in files if f not in executed_queries and f.startswith('MYqueries')]
query_runner = QueryRunner(filenames)
query_runner.run()
I am getting TypeError continuously in pool.starmap()
I have all the queries stored as separate txt files in the folder MYqueries. I am reading from the folder and try to execute multiple queries parallelly for faster operations. Any help would be great in this regard.
Thanks Also, if I have made a lot of fundamental mistakes, do point out. I would try to amend all those.
after using sys.exc_info()
in my run() method, I am getting following results:
(<class 'TypeError'>, TypeError('QueryRunner.execute_query takes 2 positional arguments but 20 were given'), <traceback object at 0x0000025F1454D680>)
I probably have more questions than answers for you:
In your for/else
statement you check:
# Check if the for loop executed any iterations
if not i:
But you only reach this code if the for
loop ran to completion leaving i
with a value of 2. So I don't see how not i
will ever evaluate to True
. But if it does somehow evaluate to True
you have the statement:
filename = next((f for f in self.files if f not in executed_queries and f.startswith('MYqueries'))]
This does not compile. Perhaps you meant:
filename = next(f for f in self.files if f not in executed_queries and f.startswith('MYqueries'))
If your intention is to get the next filename
value from self.files
that hasn't yet been executed, then this will not work. First and foremost, global executed_queries
is not sharable among the pool's processes and you have no code to update this list even if it were sharable. Besides, the run
method is already trying each element of filenames
.
You have in method QueryRunner.run
:
results = pool.starmap(self.execute_query, [(filename,) for filename in filenames])
Is filenames
defined? Did you mean self.filenames
?
Why not just:
results = pool.map(self.execute_query, self.filenames)
You then print out each element in the results
list, but execute_query
implicitly returns None
. Until execute_query
returns something meaningful, there is not much point in storing and printing the results.
Then you have:
if __name__ == "__main__":
files = os.listdir('.')
filenames = [f for f in files if f not in executed_queries and f.startswith('MYqueries')]
...
At this point in time executed_queries
is empty so why are you even testing this?
What I Would Do
Since there is a cost to creating a connection, I would create one dedicated connection for each thread in a multithreading pool (since you execute_query
is mostly waiting, multithreading is a viable, more efficient option that multiprocessing). You need to figure out what the optimal number of threads are.
import ibm_db
import threading
def execute_query(self, filename):
# Create a connection for this thread if it does not already exist:
local_storage = threading.local()
conn = getattr(local_storage, 'conn', None)
if conn is None:
conn = ibm_db.connect("DATABASE=sample;HOSTNAME=localhost;PORT=50000;USERNAME=db2admin;PASSWORD=db2admin")
localstorage.conn = conn # remember for next time
with conn:
with open(filename) as f:
query = f.read()
print(f"Executing query: {query}")
for i in range(3):
try:
ibm_db.exec_query(conn, query)
except Exception as e:
print(f"Query failed with error: {e}")
else:
return # Success
# Query failed after 3 retries
print(f"Query failed after 3 retries")
if __name__ == "__main__":
from mutiprocessing.pool import ThreadPool
import os
files = os.listdir('.')
filenames = [f for f in files if f.startswith('MYqueries')]
# 15 threads (is there a better pool size?)
with ThreadPool(15, initializer=init_pool) as pool:
pool.map(execute_query, filenames)
Note that the way I coded execute_query
, it can be invoked without it being a pool worker function.