Search code examples
multithreadingrustsynchronizationlocking

How to acquire mutex in specific order effectively?


I need to run code in critical section using externally determined order.

I currently come up with ticket based lock which exploits mutex and condvar to send other threads to sleep. Unfortunately, it suffers from thundering herd problem

My current code (simplified pseudocode):

struct TicketMutex {
   ticket_dispenser: AtomicUsize, // start as 0
   ticket_serving: AtomicUsize, // start as 0
   unfair_mutex: Mutex,
   condvar: Condvar,
   data: *T,
}

// Note that it is called before locking a mutex.
// Mutex need to be locked in same order as resulted values.
// Called MUST call lock using returned value.
fn acquire_ticket(time_mutex: &TicketMutex) -> usize {
   time_mutex.ticket_dispenser.fetch_add(1, Relaxed)
}

// This acquires 
fn lock(time_mutex: &TicketMutex, ticket: usize){
   if ticket_mutex.ticket_serving.load(Acquire) == ticket {
      // Success!
      // 0 syscalls and writes here.
      return;
   }
   
   let unique_lock = time_mutex.unfair_mutex.lock();
   // Here thundering herd occurs because I wake all threads at once. ------ PROBLEM 1
   while ticket_mutex.ticket_serving.load(Acquire) != ticket {
       ticket_mutex.condvar.wait(unique_lock);
   }
   // Don't need to hold it during execution
   // because I track lock state by `ticket_serving` field.
   unique_lock.unlock();
}

fn unlock(time_mutex: &TicketMutex, ticket: usize){
   // Note that we can update `ticket_serving` non-atomically
   // because until that update we are in exclusive critical section.
   let next_ticket = ticket + 1;
   time_mutex.ticket_serving.store(next_ticket, Release);

   // The problem here that I always do 1-2 syscalls here    ------ PROBLEM 2
   // even if there is no contention at all (and no waiters).
   // All my attempts to put it behind some condition
   // lead to deadlocks.
   let unique_lock = time_mutex.unfair_mutex.lock();
   time_mutex.condvar.notify_all();
   unique_lock.unlock();
}
  1. How can I remove thundering herd for condvars in lock function? I am thinking about using allocating separate condvar for each thread as local variable and put it into intrusive linked list with head in but it is tricky and hard to verify its correctness.
  2. How can I avoid doing syscalls in unlock at least in non-contented case?
  3. Maybe there is some other way to implement this? I need to determine order of execution before locking because I want to run code in order of locking of another mutex which I want to unlock before locking this mutex.

Solution

  • I don't know how this question ended up being so hated. Anyway, I come up with 2 solutions myself.

    Since C++ programmers didn't like a question, I would not try to write a pseudo-code but just Rust. However, I would omit mutex poison handling for brevity.

    Option 1 is simpler and it reduces thundering herd problem but still always uses at least one syscall in lock-unlock iteration.

    Option 2 is less trivial but would avoid doing syscalls if there is no contention and guarantees lack of thundering herd situation.

    Disclaimer: Fair mutexes (including this one) are susceptible to lock convoys ([1][2]) so keep this in mind if you decide to use this.

    Option 1: Put an array of condvars into the mutex.

    This reduces effect of thundering herd problem depending on size of array. Basically, number of threads thundering the in wait loop is divided by size of array which could easily be reduced to 1 by using large array.

    // In struct
    condvars: [Condvar; N],
    
    // Waiting
    if ticket != self.serving.load(Acquire) {
      let mut lock_guard = self.unfair_mutex.lock();
      while ticket != self.serving.load(Acquire) {
        lock_guard = self.condvars[ticket % N].wait(lock_guard);
      }
    }
    
    // unlocking
    let next_ticket = ticket + 1;
    self.serving.store(next_ticket, Release);
    let _lock_guard = self.unfair_mutex.lock();
    // Need to notify_all because if there are more than
    // N threads, they may wait on same condvar.
    self.condvars[next_ticket % N].notify_all();
    

    Option 2: Separate condvars per thread on their stacks

    They are chained into intrusive linked list which is pointed by atomic pointer in mutex. It is safe because those values live until lock is successfully acquired and updated only when internal unfair mutex is held.

    This option is usable only if creation of condvar is easy. On Windows and Linux, condvars are really cheap, just one usize so it is OK to create them on demand.

    Main benefits of this solution:

    1. It does not notify any condvar if there is no waiting threads so no syscalls on uncontended case.
    2. It completely solves thundering herd problem because it notifies at most 1 thread on unlock.

    I measured its performance in contented case compared to the code with notify_all on single condvar and it has twice bigger throughput when number of threads is smaller than number of CPU cores and 5 time better when CPU is oversubscribed (more threads than CPU cores).

    struct TicketedMutex<T> {
        dispenser: AtomicUsize,
        serving: AtomicUsize,
        unfair_mutex: Mutex<()>,
        payload: UnsafeCell<T>,
        // It is a linked list of all sleeper threads
        // that need to be awoken when unlocking mutex.
        // They are ordered by ticket.
        // This pointer itself may be read outside of `unfair_mutex`
        // but elements must be read or updated only when it is locked.
        // Pointee must be updated inside unfair_mutex lcok.
        // It is important to note that each access of element in list is
        // guaranteed L1 cache miss because they are allocated
        // on stacks of different threads.
        waiters_head_ptr: AtomicPtr<Waiter>,
    }
    
    struct Waiter {
        condvar: Condvar,
        ticket: usize,
        next: Cell<*const Waiter>,
    }
    
    pub struct TicketedMutexGuard<'a, T> {
        ticket: usize,
        ticketed_mutex: &'a TicketedMutex<T>,
        payload: *mut T,
    }
    
    unsafe impl<T: Sync + Send> Sync for TicketedMutex<T> {}
    unsafe impl<T: Sync + Send> Send for TicketedMutex<T> {}
    
    impl<T> TicketedMutex<T> {
        pub fn new(val: T) -> Self {
            Self {
                dispenser: AtomicUsize::new(0),
                serving: AtomicUsize::new(0),
                unfair_mutex: Mutex::new(()),
                payload: UnsafeCell::new(val),
                waiters_head_ptr: AtomicPtr::new(core::ptr::null_mut()),
            }
        }
    
        // Caller must always invoke lock using acquired ticket at some point.
        // It must not call this method before acquiring lock.
        pub unsafe fn get_ticket(&self) -> usize {
            self.dispenser.fetch_add(1, Relaxed)
        }
    
        pub fn lock(&self, ticket: usize) -> TicketedMutexGuard<T> {
            if ticket != self.serving.load(Acquire) {
                let waiter = Waiter {
                    condvar: Condvar::new(),
                    ticket,
                    next: Cell::new(core::ptr::null_mut()),
                };
    
                let mut guard = self.unfair_mutex.lock();
    
                // Sharing `waiter` with other threads is OK because
                // it would live until we acquire lock.
                // After we acquire lock, we remove it from list so other threads would not see it.
                let new_head = unsafe {
                    let old_head: *const _ = self.waiters_head_ptr.load(Relaxed);
                    add_to_linked_list_ordered(old_head, &waiter)
                }
    
                // This is a trick to implement `Acquire` store.
                // `swap` with memory ordering `Acquire` synchronizes us
                // with `Release` `fetch_add` in unlock, therefore preventing
                // reordering of `self.serving.load` in loop with this store.
                self.waiters_head_ptr.swap(new_head as *mut Waiter, Acquire);
    
                while ticket != self.serving.load(Acquire) {
                    guard = waiter
                        .condvar
                        .wait(guard);
                }
    
                // Note that at this point we are owner of minimal ticket,
                // therefore our waiter must be first in linked list.
                // Value of `waiter` is synchonized by `unfair_mutex`.
                self.waiters_head_ptr
                    .store(waiter.next.get() as *mut Waiter, Relaxed);
                // Ensure that it lives until this point.
                drop(waiter);
            }
    
            TicketedMutexGuard {
                ticket,
                ticketed_mutex: self,
                payload: self.payload.get(),
            }
        }
    
        fn unlock(&self, ticket: usize) {
            // Use wrapping add because that is what atomic fetch_add does.
            let next_ticket = ticket.wrapping_add(1);
            // Note that this variable is modified only when we keep lock
            // so no concurent modification is possible.
            // Therefore, there is no need in `fetch_add` or CAS here.
            self.serving.store(next_ticket, Release);
    
            // We change type here because AtomicPtr doesn't support fetch_add.
            let casted_to_usize: &AtomicUsize = unsafe {
                assert_eq!(align_of::<AtomicPtr<Waiter>>(), align_of::<AtomicUsize>());
                assert_eq!(size_of::<AtomicPtr<Waiter>>(), size_of::<AtomicUsize>());
                &*(&self.waiters_head_ptr as *const AtomicPtr<_> as *const _)
            };
    
            // Check if there is any waiters.
            //
            // We use fetch_add(0, Release) to make Release load.
            // Since this operation has Release store part, changing of serving
            // cannot be reordered with this operation.
            //
            // Now, let's prove that this cannot cause deadlocks.
            // 1. If `waiter_head.swap` in lock happens before `fetch_add`, we would see that pointer is not null,
            // and would try to notify waiter.
            // 2. If `fetch_add` happens before `swap`, this thread would leave without notifying the waiter
            // but since fetch_add has Release store part, Acquire load part of `swap` would synchronize with it,
            // so waiter thread is guaranteed to see the value of `serving` when checking condition of `while` loop
            // and would not start waiting on condvar.
            if casted_to_usize.fetch_add(0, Release) == 0 {
                // There is no waiting threads to wait
                // so let's return without trying to acquire mutex
                // and notify any threads.
                // 0 syscalls case.
                return;
            }
    
            // Note that we need to hold this mutex until notifying a thread
            // because otherwise we may notify it in exact moment between checking condition
            // and starting waiting on condvar, in which case our notification would be lost.
            let _guard = self.unfair_mutex.lock();
    
            // It is OK to use Relaxed ordering here because `unfair_mutex` synchronizes
            // memory accesses to the linked list.
            let waiter_head: *const Waiter = self.waiters_head_ptr.load(Relaxed);
            if waiter_head.is_null() {
                // This could happen if other thread have seen new serving after creating waiter.
                // It is progressed so we don't need to notify it.
                return;
            }
    
            unsafe {
                // Since linked list is ordered, it is enough to check the first element.
                if (*waiter_head).ticket != next_ticket {
                    // This could happen in 2 situations:
                    // 1. Next thread is successfully progressed.
                    // 2. It has not yet checked serving in if statement or it is blocked on unfair_mutex.
                    // In both cases, it would see new serving so would not wait on condvar.
                    //
                    // Waking threads that is not the one with next_ticket
                    // is the responsibility of the next thread.
                    return;
                }
            }
    
            unsafe {
                // Since this condvar is unique for a thread,
                // we don't need notify_all.
                (*waiter_head).condvar.notify_one();
            }
        }
    }
    
    // Returns new head.
    // # Safety
    // Must be called inside critical section.
    // Elements in linked list must be valid.
    // Value must be removed from list before being destructed.
    unsafe fn add_to_linked_list_ordered(head: *const Waiter, value: &Waiter) -> *const Waiter {
        if head.is_null() {
            return value;
        }
        if unsafe { (*head).ticket > value.ticket } {
            value.next.set(head);
            return value;
        }
    
        unsafe {
            let mut it = head;
            while !(*it).next.get().is_null() && (*(*it).next.get()).ticket < value.ticket {
                it = (*it).next.get();
            }
            (*it).next.set(value);
        }
        head
    }