On Python 3.8, I have implemented multithreading for a network I/O task in which a bunch of worker threads download some data off of the network, process it and create their individual list of results. Now, when all threads finish, I want the main thread to obtain all the worker threads' result lists and process further.
For this discussion, I've eliminated the network I/O calls and introduced some dummy code. This is how it looks:
from queue import Queue
from threading import Thread
from random import randint as ri
class DownloadWorker(Thread):
def __init__(self, queue, result_q):
Thread.__init__(self)
self.queue = queue
self.result_q = result_q
def run(self):
while True:
start_val = self.queue.get()
try:
# dummy code. Real code has network calls here
thread_output = [ri(0, 10) + start_val, ri(0, 10) + start_val, ri(0, 10) + start_val]
self.result_q.put(thread_output)
finally:
self.queue.task_done()
def main():
queue = Queue() # Communication between main thread and its workers
result_q = Queue() # Result queue so workers results can finally be pooled together by main thread
# Create 2 worker threads
for x in range(2):
worker = DownloadWorker(queue, result_q)
# Setting daemon to True will let the main thread exit even if worker threads block
worker.daemon = True
worker.start()
start_values = [10, 100] # pass start value to differentiate between thread outputs
for start_val in start_values:
queue.put(start_val)
queue.join()
# Both workers tasks done. Now let's pool the results(just printing here for simiplification..)
while not result_q.empty():
print(result_q.get())
if __name__ == '__main__':
main()
This code works well so far but I want to know if there is a better way to pool results in main thread using multithreading in Python 3.8. I looked at this old thread but it throws errors when I change it as per my requirement(frankly I don't understand that solution there very well).
Appreciate some pointers on this!
You have invented your own thread pooling which has has already been provided by the ThreadPoolExecutor
class in the concurrent.futures
module:
import concurrent.futures
from random import randint as ri
def worker(start_val):
# dummy code. Real code has network calls here
return [ri(0, 10) + start_val, ri(0, 10) + start_val, ri(0, 10) + start_val]
def main():
NUMBER_THREADS = 2
with concurrent.futures.ThreadPoolExecutor(max_workers=NUMBER_THREADS) as executor:
start_values = [10, 100] # pass start value to differentiate between thread outputs
# method submit returns a Future instance, which encapsulates the asynchronous execution of a callable:
futures = [executor.submit(worker, start_val) for start_val in start_values]
for future in futures:
result = future.result() # block until a result is returned
print(result)
# or you can do: results = executor.map(worker, start_values)
if __name__ == '__main__':
main()
Prints:
[20, 14, 11]
[104, 104, 108]