Search code examples
pythonpython-3.xmultithreadingpython-multithreadingthreadpoolexecutor

How to I make sure my threads aren't overloaded?


I am new to coding threads. I have 5,000,000 tasks I need to do with only 9 threads available. My current code seems to be creating more tasks than the available threads. I haven't done the calculation but it roughly means I will have ~4,999,900 tasks in the queue when only 100 tasks have been completed (you get the point). This means the memory is being used inefficiently. I could've had 81 tasks in the queue with 9 threads, and as soon as 1 task is complete, I add a new task to the queue rather than making all 5,000,000 tasks at once.

My question is, how do I make the task queue smaller (say 81 tasks) and then once a task is complete, I add a new task to the queue. Hence, maintaining an 81 task queue. Also, I append the thread threads.append(executor.submit(do_task, i)) to the threads list. That also means the threads list is going to be 5,000,000 long at the end of execution, hence, inefficient. I only append the threads so it doesn't jump the code to the printing of nums. How can I fix that too? While making sure print(f'Nums: {nums}') is only printed once all 5,000,000 tasks have been completed.

import threading
import time
from concurrent.futures import ThreadPoolExecutor

def do_task(i):
    time.sleep(2)
    nums.append(i)
    print(f'Thread: {threading.current_thread()} | i = {i}')

executor = ThreadPoolExecutor(max_workers=9)
threads = []
nums = []
for i in range(0, 5000000):
    time.sleep(0.5) # this sleep function represents me creating the task for the thread to do
    print(f'About to submit {i}')
    threads.append(executor.submit(do_task, i))

print(f'Thread count: {len(threads)}')
for thread in threads:
    thread.result()

print(f'Nums: {nums}')

Solution

  • Here's one easy-enough way, using the package's flexible wait() function:

    import threading
    import time
    from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED
    
    NTODO = 5000000
    MOST_AT_ONCE = 12
    
    def do_task(i):
        time.sleep(2)
        nums.append(i)
        print(f'Thread: {threading.current_thread()} | i = {i}')
    
    def consume(threads, max_to_leave=MOST_AT_ONCE):
        while len(threads) > max_to_leave:
            done, _ = wait(threads, return_when=FIRST_COMPLETED)
            for t in done:
                print(f'closing out i = {threads[t]}')
                t.result()
                del threads[t]
    
    executor = ThreadPoolExecutor(max_workers=9)
    threads = {}
    nums = []
    for i in range(0, NTODO):
        time.sleep(0.5) # this sleep function represents me creating the task for the thread to do
        print(f'About to submit {i}')
        threads[executor.submit(do_task, i)] = i
        consume(threads)
    consume(threads, 0)
    
    print(f'Thread count: {len(threads)}')
    print(f'Nums: {nums}')
    

    Most of that is just adding new bells & whistles to what you already wrote.

    Instead of using a list for threads, it uses a dict with thread keys, because deletion of an arbitrary dict key is efficient. And it's a dict instead of a set so that the thread can be mapped back to the integer originally passed to the thread. That's to make things clearer - it's not at all essential.

    So after firing off a new thread task, consume(threads) is called to wait until at most MOST_AT_ONCE threads are still working. The result is retrieved for each thread that completes work while it's waiting, and the thread is removed from threads.

    After the main loop ends, consume(threads, 0) is called to wait until no (0) threads are still running.

    BTW, to make this easier for you to follow at first, I continued to use your names. In particular, threads. But that's misleading, and you should get over it ;-) .submit() returns a Future object, not a thread object. Indeed, that's why this code works at all. Thread objects are reused, but each Future object is unique, which makes Future objects suitable as dict keys (or set elements).