Search code examples
pythonmultithreadingpython-3.xblockingqueue

.join for possible empty Queue in python


I'm trying to implement a Queue processing thread in python as follows:

from queue import Queue
from threading import Thread
import sys    

class Worker(Thread):
    def __init__(self, queue):
        # Call thread constructor
        self.queue = queue

    def run(self):
        while True:
            task = self.queue.get()
            # doTask()
            self.queue.task_done()

queue = Queue()
thread = Worker(thread)
thread.start()

while True:
    inp = user_input()

    if condition(inp):
        queue.put(sometask())
    else:
        queue.join()
        thread.join()
        sys.exit(0)

In this example, suppose user decides to exit without adding any item to queue. Then my thread will be blocking at self.queue.get and I queue.join() won't work. Because of that, I can't perform a proper exit.

How can I deal with this issue?


Solution

  • You can give Queue.get a timeout and use a stop event:

    from Queue import Queue, Empty
    from threading import Thread, Event
    import sys
    
    class Worker(Thread):
        def __init__(self, queue, stop):
            # Call thread constructor
            self.queue = queue
            self.stop = stop
            super(Worker, self).__init__()
    
        def run(self):
            while not self.stop.is_set():
                try:
                    task = self.queue.get(timeout=1)
                except Empty:
                    continue
                # doTask()
                self.queue.task_done()
    
    queue = Queue()
    stop = Event()
    thread = Worker(queue, stop)
    thread.start()
    
    while True:
        inp = raw_input()
    
        if inp:
            queue.put(inp)
        else:
            stop.set()
            queue.join()
            thread.join()
            sys.exit(0)
    

    This adds a condition to the Thread worker's while loop so that you can stop it whenever. You have to give the Queue.get a timeout so that it can check the stop event periodically.

    Update

    You can use a sentinel rather than timeout:

    from Queue import Queue
    from threading import Thread
    import sys
    
    _sentinel = Object()
    
    class Worker(Thread):
        def __init__(self, queue, sentinel=None):
            # Call thread constructor
            self.queue = queue
            self.sentinel = sentinel
            super(Worker, self).__init__()
    
        def run(self):
            while True:
                task = self.queue.get()
                if task is self.sentinel:
                    self.queue.task_done()
                    return
                # doTask()
                self.queue.task_done()
    
    
    queue = Queue()
    thread = Worker(queue, sentinel=_sentinel)
    thread.start()
    
    while True:
        inp = raw_input()
    
        if inp:
            queue.put(inp)
        else:
            queue.put(_sentinel)
            queue.join()
            thread.join()
            sys.exit(0)
    

    Thanks to Bakuriu for the sentinel = Object() suggestion.