Search code examples
multithreadingthreadpoolmulticorepriority-inversion

Releasing multiple locks without causing priority inversion


Short version: How do I release multiple locks from a single thread, without being preempted halfway through?

I have a program which is designed to run on an N-core machine. It consists of one main thread and N worker threads. Each thread (including the main thread) has a semaphore it can block on. Normally, each worker thread is blocked on decrementing its semaphore, and the main thread is running. Every now and then, though, the main thread should wake up the worker threads to do their thing for a certain amount of time, then block on its own semaphore waiting for them all to go back to sleep. Like so:

def main_thread(n):
    for i = 1 to n:
        worker_semaphore[i] = semaphore(0)
        spawn_thread(worker_thread, i)
    main_semaphore = semaphore(0)

    while True:
        ...do some work...
        workers_to_wake = foo()
        for i in workers_to_wake:
            worker_semaphore[i].increment() # wake up worker n
        for i in workers_to_wake:
            main_semaphore.decrement() # wait for all workers

def worker_thread(i):
    while True:
        worker_semaphore(i).decrement() # wait to be woken
        ...do some work...
        main_semaphore.increment() # report done with step

All well and good. The problem is, one of the woken workers may end up preempting the main thread halfway through waking the workers: This can happen, for instance, when the Windows scheduler decides to boost that worker's priority. This doesn't lead to deadlock, but it is inefficient, because the remainder of the threads stay asleep until the preempting worker finishes its work. It's basically priority inversion, with the main thread waiting on one of the workers, and some of the worker threads waiting on the main thread.

I can probably figure out OS- and scheduler-specific hacks for this, such as disabling priority boosting under Windows, and fiddling about with thread priorities and processor affinities, but I'd like something cross-platform-ish and robust and clean. So: How can I wake up a bunch of threads atomically?


Solution

  • Peter Brittain's solution, plus Anton's suggestion of a "tree-like wakeup", led me to another solution: Chained wakeups. Basically, rather than the main thread doing all the wakeups, it only wakes up one thread; and then each thread is then responsible for waking up the next one. The elegant bit here is that there's only ever one suspended thread ready to run, so threads rarely end up switching cores. In fact, this works fine with strict processor affinities, even if one of the worker threads shares affinity with the main thread.

    The other thing I did was to use an atomic counter that worker threads decrement before sleeping; that way, only the last one wakes the main thread, so there's also no chance of the main thread being woken several times just to do more semaphore waiting.

    workers_to_wake = []
    main_semaphore = semaphore(0)
    num_woken_workers = atomic_integer()
    
    def main_thread(n):
        for i = 1 to n:
            worker_semaphore[i] = semaphore(0)
            spawn_thread(worker_thread, i)
        main_semaphore = semaphore(0)
    
        while True:
            ...do some work...
    
            workers_to_wake = foo()
            num_woken_workers.atomic_set(len(workers_to_wake)) # set completion countdown
            one_to_wake = workers_to_wake.pop()
            worker_semaphore[one_to_wake].increment() # wake the first worker
            main_semaphore.decrement() # wait for all workers
    
    def worker_thread(i):
        while True:
            worker_semaphore[i].decrement() # wait to be woken
            if workers_to_wake.len() > 0: # more pending wakeups
                one_to_wake = workers_to_wake.pop()
                worker_semaphore[one_to_wake].increment() # wake the next worker
    
            ...do some work...
    
            if num_woken_workers.atomic_decrement() == 0: # see whether we're the last one
                main_semaphore.increment() # report all done with step