Search code examples
pythonmultithreadingasynchronousqueuetimeout

Python Queue: how to delay / prolong / modify timeout for blocking get?


I have a queue.Queue that is filled by a thread. A method tries to receive from this queue with a timeout. Now let's say, that another thread can reset the timeout of our queue-waiting, and if the queue is not fed in time, our receiver function should go on, with the updated timeout. I can achieve this as below, however I had to modify the builtin queue.Queue class, so that the endtime parameter in the get() method can be modified during waiting... Is there any better solution for this? (I don't want to use asyncio...)

from threading import Thread
from queue import Queue, Empty
import time

q = Queue()
TIMEOUT = 1
RESET_TIME = 0.5
PUT_TIME = 1.2
t0 = time.time()

def receive():
    try:
        _res = q.get(block=True, timeout=TIMEOUT)
        print(f'get @ {time.time()-t0}')
        return _res
    except Empty:
        print(f'to @ {time.time()-t0}')
        return None

def feed_queue():
    time.sleep(PUT_TIME)
    print(f'put @ {time.time()-t0}')
    q.put_nowait(42)

def reset_timeout():
    time.sleep(RESET_TIME)
    with q.mutex:
        q.endtime += TIMEOUT
    print(f'reset @ {time.time()-t0}')

if __name__ == '__main__':
    Thread(target=feed_queue).start()
    Thread(target=reset_timeout).start()
    res = receive()
    print('res:', res)

This yields:

reset @ 0.5013222694396973
put @ 1.201164722442627
get @ 1.201164722442627
res: 42

The following modification has been done in queue.py for this to work:

Index: queue.py
===================================================================
--- queue.py    (revision 28725)
+++ queue.py    (working copy)
@@ -52,6 +52,7 @@
         # drops to zero; thread waiting to join() is notified to resume
         self.all_tasks_done = threading.Condition(self.mutex)
         self.unfinished_tasks = 0
+        self.endtime = 0

     def task_done(self):
         '''Indicate that a formerly enqueued task is complete.
@@ -171,9 +172,9 @@
             elif timeout < 0:
                 raise ValueError("'timeout' must be a non-negative number")
             else:
-                endtime = time() + timeout
+                self.endtime = time() + timeout
                 while not self._qsize():
-                    remaining = endtime - time()
+                    remaining = self.endtime - time()
                     if remaining <= 0.0:
                         raise Empty
                     self.not_empty.wait(remaining)


Solution

  • You can create your own class, inheriting Queue and adding the global variable endtime like this:

    class Waszil(Queue):
        def __init__(self, maxsize=0):
            super().__init__(self)
            self.maxsize = maxsize
            self._init(maxsize)
            self.endtime = 0
    

    Then just change q = Queue() to q = Waszil() and you should be good to go.

    EDIT: If you prefer to make inherent thread safety in the Waszil class, you can use threading.Lock like this:

    from threading import Lock
    
    class Waszil(Queue):
        def __init__(self, maxsize=0):
            super().__init__(self)
            self.threadLock = Lock()
            self.maxsize = maxsize
            self._init(maxsize)
            self.endtime = 0
    
        def increment_endtime(self):
            with self.threadLock:
                self.endtime += 1
    

    In this case, instead of your

    with q.mutex:
        q.endtime += TIMEOUT    
    

    you simply call q.increment_endtime()