I'm using Queue and Thread to create a program that runs multiple light threads based on an objects list with limitation of the number of the workers to 3.
from threading import Thread
from queue import Queue
def worker(q_objects, q_objects_adj):
return q_objects.get(), q_objects_adj.get()
def get_obj_adj(object):
return f"{object}_adj"
objects_list = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p']
q_objects = Queue()
q_objects_adj = Queue()
for i in range(3):
t = Thread(target=worker, args=[q_objects, q_objects_adj], daemon=True)
t.start()
for obj in objects_list:
q_objects.put(obj)
q_objects_adj.put(get_obj_adj(obj))
q_objects.join()
#q_objects_adj.join()
The program is running forever now without any return. What am I doing wrong ?
Ok since you edited your original code I edited as well, here is my new answer :
from threading import Thread
from queue import Queue
import time
def worker(thread_id, q_objects, q_objects_adj):
print("Thread", thread_id, "starts")
while not q_objects.empty():
data = q_objects.get(), q_objects_adj.get()
print("Thread", thread_id, data)
q_objects.task_done()
q_objects_adj.task_done()
time.sleep(0.01)
print("Thread", thread_id, "ends")
return
def get_obj_adj(obj):
return f"{obj}_adj"
objects_list = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p']
q_objects = Queue()
q_objects_adj = Queue()
for obj in objects_list:
q_objects.put(obj)
q_objects_adj.put(get_obj_adj(obj))
print("Start")
for i in range(3):
t = Thread(target=worker, args=[i, q_objects, q_objects_adj], daemon=True)
t.start()
q_objects.join()
q_objects_adj.join()
print("End")
And the restult here :
Start
Thread 0 starts
Thread 0 ('a', 'a_adj')
Thread 1 starts
Thread 2 starts
Thread 1 ('b', 'b_adj')
Thread 2 ('c', 'c_adj')
Thread 1 ('d', 'd_adj')
Thread 0 ('e', 'e_adj')
Thread 2 ('f', 'f_adj')
Thread 1 ('g', 'g_adj')
Thread 0 ('h', 'h_adj')
Thread 2 ('i', 'i_adj')
Thread 1 ('j', 'j_adj')
Thread 0 ('k', 'k_adj')
Thread 2 ('l', 'l_adj')
Thread 1 ('m', 'm_adj')
Thread 0 ('n', 'n_adj')
Thread 2 ('o', 'o_adj')
Thread 1 ('p', 'p_adj')
Thread 0 ends
End
What was missing in your code was :
while not q_objects.empty():
in a worker thread so that it loops all
..task_done()
in the workers so that the .join() is not blocking
Also, I added a time.sleep() to fake a duration, otherwise the first thread would have eaten up all the data in this simple example
[New edit to answer comment] Well you just need to do :
for obj in objects_list:
q_objects.put([obj, get_obj_adj(obj)])
i.e adding a list of stuff as an object in the Queue.
Then in the worker, a simple data = q_objects.get()
would retrieve this list of 2 elements. You access them with data[0]
and data[1]
:)
And in order for you to understand why it was not secure :
When you do a = f1(), f2()
, f1 is computed first, then f2, then both result are put in a as a tuple. But the two computations of f1 and f2 happen the one after another.
In a threaded context, what could happen is :
1. Thread0 : data = q_objects.get(), q_objects_adj.get()
2. Thread0 : q_objects.get is computing
3. Thread1 : data = q_objects.get(), q_objects_adj.get()
4. Thread1 : q_objects.get is computing
And now... If the compute from thread1 finishes first, you are doomed, because here is what comes next :
5. Thread1 : q_objects.get finishes and gives B (A is about to be given to Thread0 who asked first)
6. Thread1 : q_objects_adj is now computing, and WILL give A_adj since it has never been asked yet
And the result is out of synchro : Thread0 will have A and B_adj, Thread1 will have B and A_adj
Note that this will happen only in the case when two threads ask exactly at the same time but... You know what, shit happens ^^