I have a Thread that should wait for tasks to arrive from different multible Threads and execute them until no task is left. If no task is left it should wait again.
I tried it with this class (only relevant code):
from threading import Event, Thread
class TaskExecutor(object):
def __init__(self):
self.event = Event()
self.taskInfos = []
self._running = True
task_thread = Thread(target=self._run_worker_thread)
self._running = True
task_thread.daemon = True
task_thread.start()
def _run_worker_thread(self):
while self.is_running():
if len(self.taskInfos) == 0:
self.event.clear()
self.event.wait()
try:
msg, task = self.taskInfos[0]
del self.taskInfos[0]
if task:
task.execute(msg)
except Exception, e:
logger.error("Error " + str(e))
def schedule_task(self, msg, task):
self.taskInfos.append((msg, task))
self.event.set()
Multiple Threads are calling schedule_task
everytime they like to add a task.
The problem is that I get an error sometimes saying: list index out of range
from the msg, task = self.taskInfos[0]
line. The del self.taskInfos[0]
below is the only one where I delete a task.
How can that happen? I feel like I have to synchronize
everything, but there is no such keyword in python, and reading the docs brought up this pattern.
This code is pretty hopeless - give up on it and do something sane ;-) What's sane? Use a Queue.Queue
. That's designed to do what you want.
Replace:
self.event = Event()
self.taskInfos = []
with:
self.taskInfos = Queue.Queue()
(of course you have to import Queue
too).
To add a task:
self.taskInfos.put((msg, task))
To get a task:
msg, task = self.taskInfos.get()
That will block until a task is available. There are also options to do a non-blocking .get()
attempt, and to do a .get()
attempt with a timeout (read the docs).
Trying to fix the code you have would be a nightmare. At heart, Event
s are not powerful enough to do what you need for thread safety in this context. In fact, any time you see code doing Event.clear()
, it's probably buggy (subject to races).
Edit: what will go wrong next
If you continue trying to fix this code, this is likely to happen next:
the queue is empty
thread 1 does len(self.taskInfo) == 0, and loses its timeslice
thread 2 does self.taskInfos.append((msg, task))
and does self.event.set()
and loses its timeslice
thread 1 resumes and does self.event.clear()
and does self.event.wait()
Oops! Now thread 1 waits forever, despite that a task is on the queue.
That's why Python supplies Queue.Queue
. You're exceedingly unlikely to get a correct solution using a feeble Event
.