Search code examples
rustconcurrency

How handle atomic changes to lock-free data structures?


Code below was desired to handle size of the map and make kind of it's rotation, based on total size. The problem is in concurrent calls. I can't avoid writing way more data than I expected. I want to have advantages of lock-free backed data-store, but don't know what kind of pattern can help me.

struct Store {
    map: DashMap<String, Vec<u8>>,
    size: RwLock<usize>,
}
impl Store {
    pub fn put(&self, key: String, buf: Vec<u8>) {
        let clean_needed = {
            let size = self.size.read().unwrap();
            println!("Size {}", *size);
            *size  >= 50000usize
        };
        let buff_size = buf.len();
        if clean_needed {
            let mut size = self.size.write().unwrap();
            *size = 0;
            self.map.clear();
            println!("Store cleaned");
        } else {
            let mut size = self.size.write().unwrap();
            *size += buff_size;
            self.map.entry(key).and_modify(|e| e.extend(buf)).or_insert(Vec::new());
            println!("Buff added: {}", buff_size);
        }
    }
} 

Solution

  • You have a race condition because you lock size multiple times. Annotating your code to explain:

            let clean_needed = {
                // Multiple threads can take a read lock at the same time
                let size = self.size.read().unwrap();
                println!("Size {}", *size);
                // If the size was less than 50000,
                // All would conclude `clean_needed = false`
                *size  >= 50000usize
            };
            let buff_size = buf.len();
            if clean_needed {
                let mut size = self.size.write().unwrap();
                *size = 0;
                self.map.clear();
                println!("Store cleaned");
            } else {
                // Then they will each sequentially take a write lock
                let mut size = self.size.write().unwrap();
                // And increment the size
                *size += buff_size;
                // And push their data in
                self.map.entry(key).and_modify(|e| e.extend(buf)).or_insert(Vec::new());
                // Causing the size to overflow the limit
                println!("Buff added: {}", buff_size);
            }
    

    The solution is to hold the write lock for the whole routine:

    struct Store {
        map: DashMap<String, Vec<u8>>,
        size: RwLock<usize>,
    }
    impl Store {
        pub fn put(&self, key: String, mut buf: Vec<u8>) {
            let mut size = self.size.write().unwrap();
    
            let clean_needed = {
                println!("Size {}", *size);
                *size >= 50000usize
            };
            let buff_size = buf.len();
            if clean_needed {
                self.map.clear();
                *size = 0;
                println!("Store cleaned");
            } else {
                self.map
                    .entry(key)
                    .and_modify(|e| e.append(&mut buf))
                    .or_insert(buf);
                *size += buff_size;
                println!("Buff added: {}", buff_size);
            }
        }
    }
    

    I also fixed your .and_modify(...).or_insert(...) so it uses Vec::append if there is an occupied entry, and just inserts buf directly if the entry was vacant.

    playground