Search code examples
pythonpython-multithreading

Python program run forever using Queue and Thread


I'm using Queue and Thread to create a program that runs multiple light threads based on an objects list with limitation of the number of the workers to 3.

from threading import Thread
from queue import Queue

def worker(q_objects, q_objects_adj):
    return q_objects.get(), q_objects_adj.get()
    
def get_obj_adj(object):
    return f"{object}_adj"

objects_list = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p']

q_objects = Queue()
q_objects_adj = Queue()

for i in range(3):
     t = Thread(target=worker, args=[q_objects, q_objects_adj], daemon=True)
     t.start()

for obj in objects_list:
    q_objects.put(obj)
    q_objects_adj.put(get_obj_adj(obj))

q_objects.join() 
#q_objects_adj.join()

The program is running forever now without any return. What am I doing wrong ?


Solution

  • Ok since you edited your original code I edited as well, here is my new answer :

    from threading import Thread
    from queue import Queue
    import time
    
    def worker(thread_id, q_objects, q_objects_adj):
        print("Thread", thread_id, "starts")
        while not q_objects.empty():
            data = q_objects.get(), q_objects_adj.get()
            print("Thread", thread_id, data)
            q_objects.task_done()
            q_objects_adj.task_done()
            time.sleep(0.01)
        print("Thread", thread_id, "ends")
        return
    
    def get_obj_adj(obj):
        return f"{obj}_adj"
    
    objects_list = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p']
    
    q_objects = Queue()
    q_objects_adj = Queue()
    
    for obj in objects_list:
        q_objects.put(obj)
        q_objects_adj.put(get_obj_adj(obj))
    
    print("Start")
    for i in range(3):
         t = Thread(target=worker, args=[i, q_objects, q_objects_adj], daemon=True)
         t.start()
    q_objects.join()
    q_objects_adj.join()
    print("End")
    

    And the restult here :

    Start
    Thread 0 starts
    Thread 0 ('a', 'a_adj')
    Thread 1 starts
    Thread 2 starts
    Thread 1 ('b', 'b_adj')
    Thread 2 ('c', 'c_adj')
    Thread 1 ('d', 'd_adj')
    Thread 0 ('e', 'e_adj')
    Thread 2 ('f', 'f_adj')
    Thread 1 ('g', 'g_adj')
    Thread 0 ('h', 'h_adj')
    Thread 2 ('i', 'i_adj')
    Thread 1 ('j', 'j_adj')
    Thread 0 ('k', 'k_adj')
    Thread 2 ('l', 'l_adj')
    Thread 1 ('m', 'm_adj')
    Thread 0 ('n', 'n_adj')
    Thread 2 ('o', 'o_adj')
    Thread 1 ('p', 'p_adj')
    Thread 0 ends
    End
    

    What was missing in your code was : while not q_objects.empty(): in a worker thread so that it loops all ..task_done() in the workers so that the .join() is not blocking

    Also, I added a time.sleep() to fake a duration, otherwise the first thread would have eaten up all the data in this simple example


    [New edit to answer comment] Well you just need to do :

    for obj in objects_list:
        q_objects.put([obj, get_obj_adj(obj)])
    

    i.e adding a list of stuff as an object in the Queue. Then in the worker, a simple data = q_objects.get() would retrieve this list of 2 elements. You access them with data[0] and data[1] :)

    And in order for you to understand why it was not secure : When you do a = f1(), f2(), f1 is computed first, then f2, then both result are put in a as a tuple. But the two computations of f1 and f2 happen the one after another.

    In a threaded context, what could happen is :

     1. Thread0 : data = q_objects.get(), q_objects_adj.get()
     2. Thread0 : q_objects.get is computing
     3. Thread1 : data = q_objects.get(), q_objects_adj.get()
     4. Thread1 : q_objects.get is computing
    

    And now... If the compute from thread1 finishes first, you are doomed, because here is what comes next :

     5. Thread1 : q_objects.get finishes and gives B (A is about to be given to Thread0 who asked first)
     6. Thread1 : q_objects_adj is now computing, and WILL give A_adj since it has never been asked yet
    

    And the result is out of synchro : Thread0 will have A and B_adj, Thread1 will have B and A_adj

    Note that this will happen only in the case when two threads ask exactly at the same time but... You know what, shit happens ^^