Search code examples
pythonmultithreadingsynchronizationproducer-consumer

Index out of Range in consumer Thread


I have a Thread that should wait for tasks to arrive from different multible Threads and execute them until no task is left. If no task is left it should wait again.

I tried it with this class (only relevant code):

from threading import Event, Thread

class TaskExecutor(object):
    def __init__(self):
        self.event = Event()
        self.taskInfos = []
        self._running = True

        task_thread = Thread(target=self._run_worker_thread)
        self._running = True
        task_thread.daemon = True
        task_thread.start()

    def _run_worker_thread(self):
        while self.is_running():
            if len(self.taskInfos) == 0:
                self.event.clear()
                self.event.wait()

            try:
                msg, task = self.taskInfos[0]
                del self.taskInfos[0]
                if task:
                    task.execute(msg)
            except Exception, e:
                logger.error("Error " + str(e))

    def schedule_task(self, msg, task):
        self.taskInfos.append((msg, task))
        self.event.set()

Multiple Threads are calling schedule_task everytime they like to add a task.

The problem is that I get an error sometimes saying: list index out of range from the msg, task = self.taskInfos[0] line. The del self.taskInfos[0] below is the only one where I delete a task.

How can that happen? I feel like I have to synchronize everything, but there is no such keyword in python, and reading the docs brought up this pattern.


Solution

  • This code is pretty hopeless - give up on it and do something sane ;-) What's sane? Use a Queue.Queue. That's designed to do what you want.

    Replace:

        self.event = Event()
        self.taskInfos = []
    

    with:

        self.taskInfos = Queue.Queue()
    

    (of course you have to import Queue too).

    To add a task:

        self.taskInfos.put((msg, task))
    

    To get a task:

        msg, task = self.taskInfos.get()
    

    That will block until a task is available. There are also options to do a non-blocking .get() attempt, and to do a .get() attempt with a timeout (read the docs).

    Trying to fix the code you have would be a nightmare. At heart, Events are not powerful enough to do what you need for thread safety in this context. In fact, any time you see code doing Event.clear(), it's probably buggy (subject to races).

    Edit: what will go wrong next

    If you continue trying to fix this code, this is likely to happen next:

    the queue is empty
    thread 1 does len(self.taskInfo) == 0, and loses its timeslice
    thread 2 does self.taskInfos.append((msg, task))
             and does self.event.set()
             and loses its timeslice
    thread 1 resumes and does self.event.clear()
             and does self.event.wait()
    

    Oops! Now thread 1 waits forever, despite that a task is on the queue.

    That's why Python supplies Queue.Queue. You're exceedingly unlikely to get a correct solution using a feeble Event.