I'm trying to implement a Queue
processing thread in python as follows:
from queue import Queue
from threading import Thread
import sys
class Worker(Thread):
def __init__(self, queue):
# Call thread constructor
self.queue = queue
def run(self):
while True:
task = self.queue.get()
# doTask()
self.queue.task_done()
queue = Queue()
thread = Worker(thread)
thread.start()
while True:
inp = user_input()
if condition(inp):
queue.put(sometask())
else:
queue.join()
thread.join()
sys.exit(0)
In this example, suppose user decides to exit
without adding any item to queue. Then my thread will be blocking at self.queue.get
and I queue.join()
won't work. Because of that, I can't perform a proper exit
.
How can I deal with this issue?
You can give Queue.get
a timeout and use a stop event:
from Queue import Queue, Empty
from threading import Thread, Event
import sys
class Worker(Thread):
def __init__(self, queue, stop):
# Call thread constructor
self.queue = queue
self.stop = stop
super(Worker, self).__init__()
def run(self):
while not self.stop.is_set():
try:
task = self.queue.get(timeout=1)
except Empty:
continue
# doTask()
self.queue.task_done()
queue = Queue()
stop = Event()
thread = Worker(queue, stop)
thread.start()
while True:
inp = raw_input()
if inp:
queue.put(inp)
else:
stop.set()
queue.join()
thread.join()
sys.exit(0)
This adds a condition to the Thread worker's while loop so that you can stop it whenever. You have to give the Queue.get
a timeout so that it can check the stop event periodically.
You can use a sentinel rather than timeout:
from Queue import Queue
from threading import Thread
import sys
_sentinel = Object()
class Worker(Thread):
def __init__(self, queue, sentinel=None):
# Call thread constructor
self.queue = queue
self.sentinel = sentinel
super(Worker, self).__init__()
def run(self):
while True:
task = self.queue.get()
if task is self.sentinel:
self.queue.task_done()
return
# doTask()
self.queue.task_done()
queue = Queue()
thread = Worker(queue, sentinel=_sentinel)
thread.start()
while True:
inp = raw_input()
if inp:
queue.put(inp)
else:
queue.put(_sentinel)
queue.join()
thread.join()
sys.exit(0)
Thanks to Bakuriu for the sentinel = Object() suggestion.