Search code examples
pythonparallel-processingmultiprocessingdb2

Using python multiprocessing to execute more than 100 db2 queries


I am quite new to multiprocessing and threading. I am trying to come up with a solution where I have to execute multiple db2 queries in parallel, so that I can get the results fast.

I have come up with the following code:

import concurrent.futures
import multiprocessing
import os
import sys
import ibm_db

executed_queries = []


class QueryRunner:
    def __init__(self, files):
        self.files = files

    def execute_query(self, filename):
        with ibm_db.connect("DATABASE=sample;HOSTNAME=localhost;PORT=50000;USERNAME=db2admin;PASSWORD=db2admin") as conn:
            with open(filename) as f:
                query = f.read()
            print(f"Executing query: {query}")

            for i in range(3):
                try:
                    ibm_db.exec_query(conn, query)
                    break
                except Exception as e:
                    print(f"Query failed with error: {e}")
            else:
                # Query failed after 3 retries
                print(f"Query failed after 3 retries")
                # Check if the for loop executed any iterations
                if not i:
                    # The for loop did not execute any iterations, so replace the query with a different query
                    filename = next((f for f in self.files if f not in executed_queries and f.startswith('MYqueries'))]
                    self.execute_query(filename)

    def run(self):
        with multiprocessing.Pool(processes=15) as pool:
            results = pool.starmap(self.execute_query, [(filename,) for filename in filenames])

            for result in results:
                print(f"Query complete: {result}")

if __name__ == "__main__":
    files = os.listdir('.')
    filenames = [f for f in files if f not in executed_queries and f.startswith('MYqueries')]
    query_runner = QueryRunner(filenames)
    query_runner.run()

I am getting TypeError continuously in pool.starmap()

I have all the queries stored as separate txt files in the folder MYqueries. I am reading from the folder and try to execute multiple queries parallelly for faster operations. Any help would be great in this regard.

Thanks Also, if I have made a lot of fundamental mistakes, do point out. I would try to amend all those.

after using sys.exc_info() in my run() method, I am getting following results: (<class 'TypeError'>, TypeError('QueryRunner.execute_query takes 2 positional arguments but 20 were given'), <traceback object at 0x0000025F1454D680>)


Solution

  • I probably have more questions than answers for you:

    In your for/else statement you check:

                    # Check if the for loop executed any iterations
                    if not i:
    

    But you only reach this code if the for loop ran to completion leaving i with a value of 2. So I don't see how not i will ever evaluate to True. But if it does somehow evaluate to True you have the statement:

                        filename = next((f for f in self.files if f not in executed_queries and f.startswith('MYqueries'))]
    
    

    This does not compile. Perhaps you meant:

                        filename = next(f for f in self.files if f not in executed_queries and f.startswith('MYqueries'))
    

    If your intention is to get the next filename value from self.files that hasn't yet been executed, then this will not work. First and foremost, global executed_queries is not sharable among the pool's processes and you have no code to update this list even if it were sharable. Besides, the run method is already trying each element of filenames.

    You have in method QueryRunner.run:

                results = pool.starmap(self.execute_query, [(filename,) for filename in filenames])
    

    Is filenames defined? Did you mean self.filenames?

    Why not just:

                results = pool.map(self.execute_query, self.filenames)
    

    You then print out each element in the results list, but execute_query implicitly returns None. Until execute_query returns something meaningful, there is not much point in storing and printing the results.

    Then you have:

    
    if __name__ == "__main__":
        files = os.listdir('.')
        filenames = [f for f in files if f not in executed_queries and f.startswith('MYqueries')]
        ...
    

    At this point in time executed_queries is empty so why are you even testing this?

    What I Would Do

    Since there is a cost to creating a connection, I would create one dedicated connection for each thread in a multithreading pool (since you execute_query is mostly waiting, multithreading is a viable, more efficient option that multiprocessing). You need to figure out what the optimal number of threads are.

    import ibm_db
    import threading
    
    def execute_query(self, filename):
        # Create a connection for this thread if it does not already exist:
        local_storage = threading.local()
        conn = getattr(local_storage, 'conn', None)
        if conn is None:
            conn = ibm_db.connect("DATABASE=sample;HOSTNAME=localhost;PORT=50000;USERNAME=db2admin;PASSWORD=db2admin")
            localstorage.conn = conn # remember for next time
    
        with conn:
            with open(filename) as f:
                query = f.read()
            print(f"Executing query: {query}")
    
            for i in range(3):
                try:
                    ibm_db.exec_query(conn, query)
                except Exception as e:
                    print(f"Query failed with error: {e}")
                else:
                    return # Success
    
            # Query failed after 3 retries
            print(f"Query failed after 3 retries")
    
    if __name__ == "__main__":
        from mutiprocessing.pool import ThreadPool
        import os
    
        files = os.listdir('.')
        filenames = [f for f in files if f.startswith('MYqueries')]
        # 15 threads (is there a better pool size?)
        with ThreadPool(15, initializer=init_pool) as pool:
            pool.map(execute_query, filenames)
    

    Note that the way I coded execute_query, it can be invoked without it being a pool worker function.