I am new to coding threads. I have 5,000,000
tasks I need to do with only 9
threads available. My current code seems to be creating more tasks than the available threads. I haven't done the calculation but it roughly means I will have ~4,999,900 tasks in the queue when only 100 tasks have been completed (you get the point). This means the memory is being used inefficiently. I could've had 81 tasks in the queue with 9 threads, and as soon as 1 task is complete, I add a new task to the queue rather than making all 5,000,000 tasks at once.
My question is, how do I make the task queue smaller (say 81 tasks) and then once a task is complete, I add a new task to the queue. Hence, maintaining an 81 task queue. Also, I append the thread threads.append(executor.submit(do_task, i))
to the threads
list. That also means the threads
list is going to be 5,000,000 long at the end of execution, hence, inefficient. I only append the threads so it doesn't jump the code to the printing of nums
. How can I fix that too? While making sure print(f'Nums: {nums}')
is only printed once all 5,000,000 tasks have been completed.
import threading
import time
from concurrent.futures import ThreadPoolExecutor
def do_task(i):
time.sleep(2)
nums.append(i)
print(f'Thread: {threading.current_thread()} | i = {i}')
executor = ThreadPoolExecutor(max_workers=9)
threads = []
nums = []
for i in range(0, 5000000):
time.sleep(0.5) # this sleep function represents me creating the task for the thread to do
print(f'About to submit {i}')
threads.append(executor.submit(do_task, i))
print(f'Thread count: {len(threads)}')
for thread in threads:
thread.result()
print(f'Nums: {nums}')
Here's one easy-enough way, using the package's flexible wait()
function:
import threading
import time
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED
NTODO = 5000000
MOST_AT_ONCE = 12
def do_task(i):
time.sleep(2)
nums.append(i)
print(f'Thread: {threading.current_thread()} | i = {i}')
def consume(threads, max_to_leave=MOST_AT_ONCE):
while len(threads) > max_to_leave:
done, _ = wait(threads, return_when=FIRST_COMPLETED)
for t in done:
print(f'closing out i = {threads[t]}')
t.result()
del threads[t]
executor = ThreadPoolExecutor(max_workers=9)
threads = {}
nums = []
for i in range(0, NTODO):
time.sleep(0.5) # this sleep function represents me creating the task for the thread to do
print(f'About to submit {i}')
threads[executor.submit(do_task, i)] = i
consume(threads)
consume(threads, 0)
print(f'Thread count: {len(threads)}')
print(f'Nums: {nums}')
Most of that is just adding new bells & whistles to what you already wrote.
Instead of using a list for threads, it uses a dict with thread keys, because deletion of an arbitrary dict key is efficient. And it's a dict instead of a set so that the thread can be mapped back to the integer originally passed to the thread. That's to make things clearer - it's not at all essential.
So after firing off a new thread task, consume(threads)
is called to wait until at most MOST_AT_ONCE
threads are still working. The result is retrieved for each thread that completes work while it's waiting, and the thread is removed from threads
.
After the main loop ends, consume(threads, 0)
is called to wait until no (0) threads are still running.
BTW, to make this easier for you to follow at first, I continued to use your names. In particular, threads
. But that's misleading, and you should get over it ;-) .submit()
returns a Future
object, not a thread object. Indeed, that's why this code works at all. Thread objects are reused, but each Future
object is unique, which makes Future
objects suitable as dict keys (or set elements).