Code below was desired to handle size of the map and make kind of it's rotation, based on total size. The problem is in concurrent calls. I can't avoid writing way more data than I expected. I want to have advantages of lock-free backed data-store, but don't know what kind of pattern can help me.
struct Store {
map: DashMap<String, Vec<u8>>,
size: RwLock<usize>,
}
impl Store {
pub fn put(&self, key: String, buf: Vec<u8>) {
let clean_needed = {
let size = self.size.read().unwrap();
println!("Size {}", *size);
*size >= 50000usize
};
let buff_size = buf.len();
if clean_needed {
let mut size = self.size.write().unwrap();
*size = 0;
self.map.clear();
println!("Store cleaned");
} else {
let mut size = self.size.write().unwrap();
*size += buff_size;
self.map.entry(key).and_modify(|e| e.extend(buf)).or_insert(Vec::new());
println!("Buff added: {}", buff_size);
}
}
}
You have a race condition because you lock size
multiple times. Annotating your code to explain:
let clean_needed = {
// Multiple threads can take a read lock at the same time
let size = self.size.read().unwrap();
println!("Size {}", *size);
// If the size was less than 50000,
// All would conclude `clean_needed = false`
*size >= 50000usize
};
let buff_size = buf.len();
if clean_needed {
let mut size = self.size.write().unwrap();
*size = 0;
self.map.clear();
println!("Store cleaned");
} else {
// Then they will each sequentially take a write lock
let mut size = self.size.write().unwrap();
// And increment the size
*size += buff_size;
// And push their data in
self.map.entry(key).and_modify(|e| e.extend(buf)).or_insert(Vec::new());
// Causing the size to overflow the limit
println!("Buff added: {}", buff_size);
}
The solution is to hold the write lock for the whole routine:
struct Store {
map: DashMap<String, Vec<u8>>,
size: RwLock<usize>,
}
impl Store {
pub fn put(&self, key: String, mut buf: Vec<u8>) {
let mut size = self.size.write().unwrap();
let clean_needed = {
println!("Size {}", *size);
*size >= 50000usize
};
let buff_size = buf.len();
if clean_needed {
self.map.clear();
*size = 0;
println!("Store cleaned");
} else {
self.map
.entry(key)
.and_modify(|e| e.append(&mut buf))
.or_insert(buf);
*size += buff_size;
println!("Buff added: {}", buff_size);
}
}
}
I also fixed your .and_modify(...).or_insert(...)
so it uses Vec::append
if there is an occupied entry, and just inserts buf
directly if the entry was vacant.