Search code examples
rustmutexcondition-variable

How do I create a mutex with two condvars?


I want to build a single-producer multiple-consumer example in Rust, where the producer is bounded to have no more than 10 outstanding items. I modeled a solution in C that uses a a mutex and two condvars. One condvar is to wait the consumers when there is nothing to consume and one condvar is to wait for the producer when the unconsumed items count is greater than say 10. The C code is below.

As I understand it from the Rust docs, there must be a 1-1 connection between std::sync::Mutex and a std::sync::Condvar so I can't make an exact translation of my C solution.

Is there some other way to achieve the same end (that I cannot see) in Rust using std::sync::Mutex and std::sync::Condvar.

#define _GNU_SOURCE
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>

//
// This is a simple example of using a mutex and 2 condition variables to
// sync a single writer and multiple readers interacting with a bounded (fixed max size) queue
//
// in this toy example a queue is simulated by an int counter n_resource
//

int n_resource;
pthread_cond_t rdr_cvar;
pthread_cond_t wrtr_cvar;
pthread_mutex_t mutex;

void reader(void* data)
{
    long id = (long)data;
    for(;;) {

        pthread_mutex_lock(&mutex);
        while (n_resource <= 0) {
            pthread_cond_wait(&rdr_cvar, &mutex);
        }
        printf("Reader %ld n_resource = %d\n", id, n_resource);
        --n_resource;
        // if there are still things to read - singla one reader
        if(n_resource > 0) {
            pthread_cond_signal(&rdr_cvar);
        }
        // if there is space for the writer to add another signal the writer
        if(n_resource < 10) {
            pthread_cond_signal(&wrtr_cvar);
        }
        pthread_mutex_unlock(&mutex);
    }
}
void writer(void* data)
{
    for(;;) {

        pthread_mutex_lock(&mutex);
        printf("Writer before while n_resource %d \n", n_resource);
        while (n_resource > 10) {
            pthread_cond_wait(&wrtr_cvar, &mutex);
        }
        printf("Writer after while n_resource %d \n", n_resource);

        ++n_resource;
        // if there is something for a reader to read signal one of the readers.
        if(n_resource > 0) {
            pthread_cond_signal(&rdr_cvar);
        }
        pthread_mutex_unlock(&mutex);
    }
}

int main()
{
    pthread_t rdr_thread_1;
    pthread_t rdr_thread_2;
    pthread_t wrtr_thread;
    pthread_mutex_init(&mutex, NULL);
    pthread_cond_init(&rdr_cvar, NULL);
    pthread_cond_init(&wrtr_cvar, NULL);
    pthread_create(&rdr_thread_1, NULL, &reader, (void*)1L);
    pthread_create(&rdr_thread_2, NULL, &reader, (void*)2L);
    pthread_create(&wrtr_thread, NULL, &writer, NULL);
    pthread_join(wrtr_thread, NULL);
    pthread_join(rdr_thread_1, NULL);
    pthread_join(rdr_thread_2, NULL);
}

Solution

  • While a CondVar needs to be associated with only one Mutex, it is not necessary that a Mutex is associated with only one CondVar.

    For example, the following code seems to work just fine - you can run it on the playground.

    use std::sync::{Arc, Condvar, Mutex};
    use std::thread;
    
    struct Q {
        rdr_cvar: Condvar,
        wrtr_cvar: Condvar,
        mutex: Mutex<i32>,
    }
    
    impl Q {
        pub fn new() -> Q {
            Q {
                rdr_cvar: Condvar::new(),
                wrtr_cvar: Condvar::new(),
                mutex: Mutex::new(0),
            }
        }
    }
    
    fn writer(id: i32, qq: Arc<Q>) {
        let q = &*qq;
        for i in 0..10 {
            let guard = q.mutex.lock().unwrap();
            let mut guard = q.wrtr_cvar.wait_while(guard, |n| *n > 3).unwrap();
    
            println!("{}: Writer {} n_resource = {}\n", i, id, *guard);
            *guard += 1;
    
            if *guard > 0 {
                q.rdr_cvar.notify_one();
            }
            if *guard < 10 {
                q.wrtr_cvar.notify_one();
            }
        }
    }
    
    fn reader(id: i32, qq: Arc<Q>) {
        let q = &*qq;
        for i in 0..10 {
            let guard = q.mutex.lock().unwrap();
            let mut guard = q.rdr_cvar.wait_while(guard, |n| *n <= 0).unwrap();
    
            println!("{} Reader {} n_resource = {}\n", i, id, *guard);
            *guard -= 1;
    
            if *guard > 0 {
                q.rdr_cvar.notify_one();
            }
            if *guard < 10 {
                q.wrtr_cvar.notify_one();
            }
        }
    }
    
    fn main() {
        let data = Arc::new(Q::new());
        let data2 = data.clone();
    
        let t1 = thread::spawn(move || writer(0, data2));
        let t2 = thread::spawn(move || reader(1, data));
    
        t1.join().unwrap();
        t2.join().unwrap();
    }