Search code examples
pythonqueueverification

Verification needed: Clearable Python Queue class


Since I'm not an expert for Python and multi-threaded programming I'd like to ask you if my implementation is correct.

My aim was to extend the Queue class so it can be cleared. And the removed items should be returned. That's all. My implementation is:

import Queue

class ClearableQueue(Queue.Queue):

    def __init__(self, maxsize):
        Queue.Queue.__init__(self, maxsize)

    def clear(self):
        self.mutex.acquire()

        copyOfRemovedEntries = list(self.queue)
        self.queue.clear()
        self.unfinished_tasks = 0
        self.all_tasks_done.notifyAll()
        self.not_full.notifyAll()

        self.mutex.release()

        return copyOfRemovedEntries

Is it correct? Thank you.

Update: Unfortunately, this implementation is still insufficient since task_done could throw the ValueError exception after clear() is called.

More precisely: The queue is thought to be used in a multi-threaded environment. So assume one producer and one worker thread (but you can consider more threads as well). Normally, if a worker thread calls get() a task_done() should be called after the worker has done its work. If this happens this way then it could happen that the producer thread call clear() for some reason, right after the worker thread has called get() and before task_done() has been called. This works so far, however, if the worker thread would like to call task_done() then the exception will be thrown. This is because task_done() checks the number of unfinished tasks by checking unfinished_tasks of the Queue class.

It would be interesting if this issue could be handled solely by the ClearableQueue class so the clear() method can be called without worries. Or if there has to be something different that controlls the method calls somehow.

Actually, in my concrete case I don't use the join() method so I don't need to call task_done(). However, I'd like to make this feature complete. Could be useful for other people as well.


Solution

  • You appear to be suffering some sort of race condition, and if I understand it, the current situation is that you sometimes get:

    T1: |----->|------------->|-------------->|
        | get  |    some_opp  | task_done     |
    T2: |---------->|------>|---------------->|
        | other_opp | clear | yet_another_opp |
    

    Where clear is performed within get and task_done. This causes a crash. As I understand it you need some way do do this:

    T1: |----->|------------->|-------------->|
        | get  |    some_opp  | task_done     |
    T2: |---------->|------------------------>|------>|
        | other_opp | wait_for_task_done      | clear |
    

    If this is correct, you may need a second lock, set by get and released by task_done, which says 'this queue can't be cleared'. You might then need to have a version of get and task_done that does not do this for special cases where you really know that you're doing.

    An alternative to this is to have a more atomic lock which allows you to do this:

    T1: |----->|------------------->|-------------->|------------->|
        | get  |    some_opp        | task_done     | finish_clear |
    T2: |---------->|-------------->|---------------->|
        | other_opp | partial_clear | yet_another_opp |
    

    Where you say 'I'm not done with this task but you can clear the rest, then tells task_done that the task had an attempt at being cleared, so it should do something after. This is starting to get fairly complex though.