Search code examples
pythonpython-3.xmultithreadingpython-multithreading

Python multithreading: Obtain worker thread results in main thread


On Python 3.8, I have implemented multithreading for a network I/O task in which a bunch of worker threads download some data off of the network, process it and create their individual list of results. Now, when all threads finish, I want the main thread to obtain all the worker threads' result lists and process further.

For this discussion, I've eliminated the network I/O calls and introduced some dummy code. This is how it looks:

from queue import Queue
from threading import Thread
from random import randint as ri

class DownloadWorker(Thread):
    def __init__(self, queue, result_q):
        Thread.__init__(self)
        self.queue = queue
        self.result_q = result_q

    def run(self):
        while True:
            start_val = self.queue.get()
            try:
                # dummy code. Real code has network calls here
                thread_output = [ri(0, 10) + start_val, ri(0, 10) + start_val, ri(0, 10) + start_val]
                self.result_q.put(thread_output)
            finally:
                self.queue.task_done()

def main():
    queue = Queue()  # Communication between main thread and its workers
    result_q = Queue()  # Result queue so workers results can finally be pooled together by main thread

    # Create 2 worker threads
    for x in range(2):
        worker = DownloadWorker(queue, result_q)
        # Setting daemon to True will let the main thread exit even if worker threads block
        worker.daemon = True
        worker.start()

    start_values = [10, 100]  # pass start value to differentiate between thread outputs
    for start_val in start_values:
        queue.put(start_val)
    queue.join()

    # Both workers tasks done. Now let's pool the results(just printing here for simiplification..)
    while not result_q.empty():
        print(result_q.get())


if __name__ == '__main__':
    main()

This code works well so far but I want to know if there is a better way to pool results in main thread using multithreading in Python 3.8. I looked at this old thread but it throws errors when I change it as per my requirement(frankly I don't understand that solution there very well).

Appreciate some pointers on this!


Solution

  • You have invented your own thread pooling which has has already been provided by the ThreadPoolExecutor class in the concurrent.futures module:

    import concurrent.futures
    from random import randint as ri
    
    
    def worker(start_val):
        # dummy code. Real code has network calls here
        return [ri(0, 10) + start_val, ri(0, 10) + start_val, ri(0, 10) + start_val]
    
    
    def main():
        NUMBER_THREADS = 2
        with concurrent.futures.ThreadPoolExecutor(max_workers=NUMBER_THREADS) as executor:
            start_values = [10, 100]  # pass start value to differentiate between thread outputs
            # method submit returns a Future instance, which encapsulates the asynchronous execution of a callable:
            futures = [executor.submit(worker, start_val) for start_val in start_values]
            for future in futures:
                result = future.result() # block until a result is returned
                print(result)
            # or you can do: results = executor.map(worker, start_values)
    
    if __name__ == '__main__':
        main()
    

    Prints:

    [20, 14, 11]
    [104, 104, 108]