Search code examples
rustblock

rust lockfree ringbuffer can't work on release mode


I implement a lockfree ringbuffer, and then i test for debug is ok, but in release mode it can't work allways.

use std::path::Display;
use std::sync::Arc;


#[derive(Debug)]
pub struct RingBuffer<T, const m_size: usize> {
    idx_head: usize,
    idx_tail: usize,
    m_data: [T; m_size],
}

pub trait Queue<T> {
    fn new_empty() -> Self;
    fn push(&mut self, value: T) -> bool;
    fn pop(&mut self) -> Option<&T>;
    fn is_full(&self) -> bool;
    fn is_empty(&self) -> bool;
}

impl<T, const Size: usize> Queue<T> for RingBuffer<T, Size>
{
    fn new_empty() -> Self {
        RingBuffer::<T, Size> {
            idx_head: 0,
            idx_tail: 0,
            m_data: array_init::array_init(|_| {
                unsafe {
                    std::mem::zeroed()
                }
            }),
        }
    }

    fn push(&mut self, value: T) -> bool {
        let mut head = self.idx_head + 1;
        if head == Size {
            head = 0;
        }
        if head == self.idx_tail {
            return false;
        }
        self.m_data[self.idx_head] = value;
        self.idx_head = head;
        return true;
    }

    fn pop(&mut self) -> Option<&T> {
        let mut tail = self.idx_tail;
        if self.idx_head == tail {
            return None;
        }
        let res = &self.m_data[tail];
        tail += 1;
        if tail == Size {
            tail = 0;
        }
        self.idx_tail = tail;
        return Some(res);
    }

    fn is_full(&self) -> bool {
        self.idx_tail == (self.idx_head + 1) % Size
    }

    fn is_empty(&self) -> bool {
        self.idx_head == self.idx_tail
    }
}

pub struct SharedRingBuffer<T, const m_size: usize> {
    pub ringbuffer: Arc<RingBuffer<T, m_size>>,
}

impl<T, const Size: usize> Clone for SharedRingBuffer<T, Size> {
    fn clone(&self) -> Self {
        Self {
            ringbuffer: self.ringbuffer.clone(),
        }
    }
}
impl<T, const Size: usize, > Queue<T> for SharedRingBuffer<T, Size> {
    fn new_empty() -> Self {
        Self {
            ringbuffer: Arc::new(RingBuffer::<T, Size>::new_empty()),
        }
    }

    fn push(&mut self, value: T) -> bool {
        unsafe {
            (*Arc::get_mut_unchecked(&mut self.ringbuffer)).push(value)
        }
    }

    fn pop(&mut self) -> Option<&T> {
        unsafe {
            (*Arc::get_mut_unchecked(&mut self.ringbuffer)).pop()
        }
    }

    fn is_full(&self) -> bool {
        self.ringbuffer.is_full()
    }

    fn is_empty(&self) -> bool {
        self.ringbuffer.is_empty()
    }
}
////////////////////// for test//////////////////////////
fn test_speed1() {
    let mut q: SharedRingBuffer<i32, 8> = SharedRingBuffer::new_empty();
    let mut t0 = std::time::SystemTime::now();
    let t = {
        let mut q = q.clone();
        std::thread::spawn(move || {
            loop {
                let t = match q.pop() {
                    None => {
                        // std::thread::sleep(Duration::from_millis(10));
                        continue;
                    }
                    Some(res) => res
                };
                if *t == -1 {
                    break;
                }
                std::thread::sleep(Duration::from_millis(1));
            }
            let now = std::time::SystemTime::now();
            println!("res: {}", now.duration_since(t0).unwrap().as_millis());
        })
    };
    for i in 0..99 {
        loop {
            if q.push(i) {
                // std::thread::sleep(Duration::from_millis(10));
                break;
            }
        }
    }
    q.push(-1);
    t.join().unwrap();
}

When i addition std::thread::sleep(Duration::from_millis(10)) for q.push and q.pop method it is work well.

rustc 1.67.0-nightly (95a3a7277 2022-10-31)
binary: rustc
commit-hash: 95a3a7277b44bbd2dd3485703d9a05f64652b60e
commit-date: 2022-10-31
host: x86_64-pc-windows-msvc
release: 1.67.0-nightly
LLVM version: 15.0.4

I expect the RingBuffer can work well. The equivalent code is:

fn test_speed2() {
    let (send, recv) = channel::<i32>();
    let mut is_run = SharedValue::new(true);
    let mut t0 = std::time::SystemTime::now();
    let t = {
        let is_run = is_run.clone();
        std::thread::spawn(move || {
            loop {
                let t = match recv.recv() {
                    Err(e) => {
                        break;
                    }
                    Ok(res) => res
                };
                if t == -1 {
                    break;
                }
                std::thread::sleep(Duration::from_millis(1));
            }
            let now = std::time::SystemTime::now();
            // println!("res: {}", now.duration_since(t0).unwrap().as_millis());
        })
    };
    for i in 0..99 {
        send.send(i).unwrap();
    }
    send.send(-1).unwrap();
    t.join().unwrap();
}

I hope ringbuffer can replace channel to communicate between two threads,Because ringbuffer is lockfree and faster.


Solution

  • Your code causes undefined behavior by creating two mutable references to the same object at the same time via Arc::get_mut_unchecked(). It looks like this was even your intention, but it is blatantly violating Rust's rules. Even when using unsafe, you cannot violate the requirement that mutable references are exclusive.

    Running your code with cargo miri reports this undefined behavior:

    error: Undefined Behavior: Data race detected between Read on thread `<unnamed>` and Write on thread `main` at alloc1894+0x10
       --> bar/src/main.rs:45:12
        |
    45  |         if self.idx_head == tail {
        |            ^^^^^^^^^^^^^ Data race detected between Read on thread `<unnamed>` and Write on thread `main` at alloc1894+0x10
        |
        = help: this indicates a bug in the program: it performed an invalid operation, and caused Undefined Behavior
        = help: see https://doc.rust-lang.org/nightly/reference/behavior-considered-undefined.html for further information
        = note: BACKTRACE:
        = note: inside `<RingBuffer<i32, 8> as Queue<i32>>::pop` at bar/src/main.rs:45:12
    note: inside `<SharedRingBuffer<i32, 8> as Queue<i32>>::pop` at bar/src/main.rs:89:18
       --> bar/src/main.rs:89:18
        |
    89  |         unsafe { (*Arc::get_mut_unchecked(&mut self.ringbuffer)).pop() }
        |                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    note: inside closure at bar/src/main.rs:108:31
       --> bar/src/main.rs:108:31
        |
    108 |                 let t = match q.pop() {
        |                               ^^^^^^^
    

    You will need to rethink your design. You'll probably need a foundation like this to make it safe to modify between threads:

    use std::cell::UnsafeCell;
    use std::mem::MaybeUninit;
    use std::sync::atomic::AtomicUsize;
    
    pub struct RingBuffer<T, const SIZE: usize> {
        idx_head: AtomicUsize,
        idx_tail: AtomicUsize,
        m_data: [UnsafeCell<MaybeUninit<T>>; SIZE],
    }