Search code examples
multithreadingconcurrencyrustmutexcondition-variable

Buffer in Rust with Mutex and Condvar


I'm trying to implement a buffer with a single consumer and a single producer. I have only used POSIX Semaphores, however, they're not available in Rust and I'm trying to implement a trivial semaphore problem with Rust sync primitives (Mutex, Condvar, Barrier, ...) but I don't want to use channels.

My code behaves too irregularly, with some cases going well and other times it just stops at some number and in other cases it just doesn't start counting.

Things appear to work better if I wait 1 second in the main thread till I send the Condvar notification but it doesn't guarantee that it's not going to enter a deadlock.

How can this program be fixed? Am I understanding Condvars wrong?

use std::thread;
use std::sync::{Arc, Condvar, Mutex};

struct Buffer {
    is_data: Mutex<bool>,
    is_data_cv: Condvar,
    is_space: Mutex<bool>,
    is_space_cv: Condvar,
    buffer: Mutex<i32>,
}

fn producer(buffer: Arc<Buffer>) {
    for i in 0..50 {
        loop {
            let mut is_space = buffer
                .is_space_cv
                .wait(buffer.is_space.lock().unwrap())
                .unwrap();
            if *is_space {
                {
                    let mut hueco = buffer.buffer.lock().unwrap();
                    *hueco = i;
                }

                *is_space = false;
                {
                    let mut is_data = buffer.is_data.lock().unwrap();
                    *is_data = true;
                }
                buffer.is_data_cv.notify_one();
                break;
            }
        }
    }
}

fn consumer(buffer: Arc<Buffer>) {
    for i in 0..50 {
        loop {
            let mut is_data = buffer
                .is_data_cv
                .wait(buffer.is_data.lock().unwrap())
                .unwrap();
            if *is_data {
                {
                    let hueco = buffer.buffer.lock().unwrap();
                    println!("{}", *hueco);
                }
                *is_data = false;
                {
                    let mut is_space = buffer.is_space.lock().unwrap();
                    *is_space = true;
                }
                buffer.is_space_cv.notify_one();
                break;
            }
        }
    }
}

fn main() {
    let buffer = Arc::new(Buffer {
        is_data: Mutex::new(false),
        is_data_cv: Condvar::new(),
        is_space: Mutex::new(true),
        is_space_cv: Condvar::new(),
        buffer: Mutex::new(0),
    });
    let b = buffer.clone();
    let p = thread::spawn(move || {
        producer(b);
    });
    let b = buffer.clone();
    let c = thread::spawn(move || {
        consumer(b);
    });

    //thread::sleep_ms(1000);

    buffer.is_space_cv.notify_one();
    c.join();
}

Solution

  • I would encourage you to create smaller methods and reuse existing Rust types such as Option. This will allow you to simplify your code quite a bit — only one Mutex and one Condvar:

    use std::thread;
    use std::sync::{Arc, Condvar, Mutex};
    
    #[derive(Debug, Default)]
    struct Buffer {
        data: Mutex<Option<i32>>,
        data_cv: Condvar,
    }
    
    impl Buffer {
        fn insert(&self, val: i32) {
            let mut lock = self.data.lock().expect("Can't lock");
            while lock.is_some() {
                lock = self.data_cv.wait(lock).expect("Can't wait");
            }
            *lock = Some(val);
            self.data_cv.notify_one();
        }
    
        fn remove(&self) -> i32 {
            let mut lock = self.data.lock().expect("Can't lock");
            while lock.is_none() {
                lock = self.data_cv.wait(lock).expect("Can't wait");
            }
            let val = lock.take().unwrap();
            self.data_cv.notify_one();
            val
        }
    }
    
    fn producer(buffer: &Buffer) {
        for i in 0..50 {
            println!("p: {}", i);
            buffer.insert(i);
        }
    }
    
    fn consumer(buffer: &Buffer) {
        for _ in 0..50 {
            let val = buffer.remove();
            println!("c: {}", val);
        }
    }
    
    fn main() {
        let buffer = Arc::new(Buffer::default());
    
        let b = buffer.clone();
        let p = thread::spawn(move || {
            producer(&b);
        });
    
        let b = buffer.clone();
        let c = thread::spawn(move || {
            consumer(&b);
        });
    
        c.join().expect("Consumer had an error");
        p.join().expect("Producer had an error");
    }
    

    If you wanted to have a bit more performance (benchmark to see if it's worth it), you could have Condvars for the "empty" and "full" conditions separately:

    #[derive(Debug, Default)]
    struct Buffer {
        data: Mutex<Option<i32>>,
        is_empty: Condvar,
        is_full: Condvar,
    }
    
    impl Buffer {
        fn insert(&self, val: i32) {
            let mut lock = self.data.lock().expect("Can't lock");
            while lock.is_some() {
                lock = self.is_empty.wait(lock).expect("Can't wait");
            }
            *lock = Some(val);
            self.is_full.notify_one();
        }
    
        fn remove(&self) -> i32 {
            let mut lock = self.data.lock().expect("Can't lock");
            while lock.is_none() {
                lock = self.is_full.wait(lock).expect("Can't wait");
            }
            let val = lock.take().unwrap();
            self.is_empty.notify_one();
            val
        }
    }