Search code examples
pythonmultithreadinglockingwaittask-queue

python condition variable `wait_for`predicate not immediately returned


I'm trying to implement a synchronous endpoint where a job is enqueued, wait until the job is finished, then return the result.

lock = Condition()

def predicate(job_in_queue):
    job_in_queue.refresh()
    print(job_in_queue.get_status())
    print(datetime.datetime.now())
    if job_in_queue.get_status() == "finished":
        return True
    return False

print(datetime.datetime.now())
with lock:
    if lock.wait_for(lambda: predicate(job), timeout=10):
        print("indeed notified")
    else:
        print("failed to notify")
    print(datetime.datetime.now())
print(datetime.datetime.now())

return job.result

The wait_for method of python condition variable: wait_for(predicate, timeout=None), will stop at this line, wait until the callable passed in, predicate, returns True, then proceed to code followed. Referring to the documentation.

However, according to my print lines, it seems the predicate is not being checked constantly. It only checks it once the first time when it's passed in/the moment when wait_for is called the first time, then it stats idle and only check it a second time after t=timeout seconds where timeout is the number I passed in. In my print lines, it only checks the predicate after 10 seconds (timeout value I specified in my code above).

   2021-07-15T13:48:33.98+0800 [APP/PROC/WEB/0] OUT 2021-07-15 05:48:23.954320
   2021-07-15T13:48:33.98+0800 [APP/PROC/WEB/0] OUT queued
   2021-07-15T13:48:33.98+0800 [APP/PROC/WEB/0] OUT 2021-07-15 05:48:23.974196
   2021-07-15T13:48:33.98+0800 [APP/PROC/WEB/0] OUT finished
   2021-07-15T13:48:33.98+0800 [APP/PROC/WEB/0] OUT 2021-07-15 05:48:33.986337
   2021-07-15T13:48:33.98+0800 [APP/PROC/WEB/0] OUT indeed notified
   2021-07-15T13:48:33.98+0800 [APP/PROC/WEB/0] OUT 2021-07-15 05:48:33.987215
   2021-07-15T13:48:33.98+0800 [APP/PROC/WEB/0] OUT 2021-07-15 05:48:33.987233

I further verified the issue is indeed that the predicate not being checked constantly by changing the timeout to be 15, and it again only checks the result of predicate after 15 seconds.

is there a way to make predicate to be constantly checking here? Not a while True busy wait, as it will hog up the CPU (and never pass code review). Or is the threading.Condition.wait_for the right way to go here?


Solution

  • The purpose of a Condition object is to synchronize the activity of two or more threads. When Thread-1 calls Condition.wait, it becomes blocked. Therefore it cannot possibly check to see when the predicate becomes true, as you want it to. Instead, it must wait for Thread-2 to do something that MAY cause the predicate to become true. At that moment, Thread-2 is supposed to notify the Condition object. The call to notify causes Thread-1 to awaken and check the predicate. If the predicate is now true, Thread-1 continues execution; otherwise it blocks again.

    In the docs it says the following:

    "Ignoring the timeout feature, calling this method is roughly equivalent to writing:

    while not predicate():
        cv.wait()
    

    "

    If you include a timeout value, Thread-1 awakes after the timeout interval regardless of whether another thread has called notify. That causes a check of the predicate, just as you observe.

    If you want to check a predicate with a reasonable frequency, either provide a short enough timeout or make sure that another thread issues notify calls at the proper time.

    But if you're not using it to synchronize two or more threads, Condition is the wrong tool. If you need a tight loop, write a tight loop.