Search code examples
pythonmultithreadingmultiprocess

Tell consumers to stop waiting for queue elements


Here's an example. I have one producer and several consumers.

#!/usr/bin/env python2

from multiprocessing import Process, Queue
import time

def counter(low, high):
    current = low 
    while current <= high:
        yield current
        current += 1

def put_tasks(q):
    for c in counter(0, 9):
        q.put(c)
        time.sleep(.1)
    print('put_tasks: no more tasks') 

def work(id, q): 
    while True:
        task = q.get()
        print('process %d: %s' % (id, task))
        time.sleep(.3)
    print('process %d: done' % id) 

if __name__ == '__main__':
    q = Queue(2)
    task_gen = Process(target=put_tasks, args=(q,))
    processes = [Process(target=work, args=(id, q)) for id in range(0, 3)] 

    task_gen.start()
    for p in processes:
        p.start()
    for p in processes:
        p.join()

counter is just a number generator for put_tasks. Typically, I would have several thousands of tasks instead of just 10 like in this example. The point of this code is to feed the queue with tasks incrementally.

The problem is that consumers cannot know in advance how many tasks they will have to process but the put_tasks function does know when it's done (it then prints no more tasks).

Sample output:

process 2: 0
process 0: 1
process 1: 2
process 2: 3
process 0: 4
process 1: 5
process 2: 6
process 0: 7
process 1: 8
process 2: 9
put_tasks: no more tasks

All tasks get processed but the program then hangs (each process gets stuck on q.get(). I would like it to terminate when all tasks have been processed without sacrificing speed or safety (no ugly timeouts).

Any ideas?


Solution

  • I suggest to put a sentinel value to put on the end of the queue

    def put_tasks(q):
        ...
    
        print('put_tasks: no more tasks')
        q.put(end_of_queue)
    
    def work(id, q):
        while True:
            task = q.get()
    
            if task == end_of_queue:
                q.put(task)
                print("DONE")
                return
    
            print('process %d: %s' % (id, task))
            time.sleep(.1)
        print('process %d: done' % id)
    
    class Sentinel:
        def __init__(self, id):
            self.id = id
    
        def __eq__(self, other):
            if isinstance(other, Sentinel):
                return self.id == other.id
    
            return NotImplemented
    
    if __name__ == '__main__':
        q = Queue(2)
        end_of_queue = Sentinel("end of queue")
        task_gen = Process(target=put_tasks, args=(q,))
        processes = [Process(target=work, args=(id, q)) for id in range(0, 3)]
        ...
    

    I don't seem to be able to use object() as the sentinel because the threads seem to have access different instances, so they don't compare equal.

    If you ever wish to generate random sentinels you can use the uuid module to generate random ids:

    import uuid
    
    class Sentinel:
        def __init__(self):
            self.id = uuid.uuid4()
    
        def __eq__(self, other):
            if isinstance(other, Sentinel):
                return self.id == other.id
    
            return NotImplemented
    

    Finally, zch used None for a sentinel which is perfectly adequate as long as the queue cannot have None in. The sentinel method will work for mostly-arbitrary arguments.