I want to prevent concurrent execution of a function called asynchronously.
The function gets called from a hyper service and two connections should lead to one waiting until the other function call is done. I thought implementing a Future to block execution until other threads / connections are done will solve the issue. Coming to my problem I store the Futures in a Mutex<HashMap<i64, LockFut>>
but when I lock the mutex to get and await the LockFut it obviously complains about the MutexGuard not being send. I don't know how to work around this or if my way is just bad.
|
132 | let mut locks = LOCKS.lock().unwrap();
| --------- has type `std::sync::MutexGuard<'_, std::collections::HashMap<i64, hoster::hoster::LockFut>>`
...
136 | lock.await;
| ^^^^^^^^^^ await occurs here, with `mut locks` maybe used later
137 | }
| - `mut locks` is later dropped here
This is my future implementation
lazy_static! {
static ref LOCKS: Mutex<HashMap<i64, LockFut>> = Mutex::new(HashMap::new());
}
struct LockState {
waker: Option<Waker>,
locked: bool
}
struct LockFut {
state: Arc<Mutex<LockState>>
}
impl Future for LockFut {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut state = self.state.lock().unwrap();
match state.locked {
false => {
Poll::Ready(())
},
true => {
state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
}
impl LockFut {
fn new() -> LockFut {
LockFut {
state: Arc::new(Mutex::new(LockState {
locked: false,
waker: None
}))
}
}
pub fn release_lock(&mut self) {
let mut state = self.state.lock().unwrap();
state.locked = false;
if let Some(waker) = state.waker.take() {
waker.wake();
}
}
pub async fn lock<'a>(id: i64) {
let mut locks = LOCKS.lock().unwrap();
// Wait for existing lock to be unlocked or create a new lock
let lock = locks.entry(id).or_insert(LockFut::new());
// Wait for the potential lock to be released
lock.await;
}
pub fn unlock(id: i64) {
match LOCKS.lock().unwrap().get_mut(&id) {
Some(lock) => lock.release_lock(),
None => warn!("No lock found for: {}", id)
};
}
}
And this is how I call it
async fn is_concurrent(id: i64) {
should_not_be_concurrent().await;
}
async fn should_not_be_concurrent(id: i64) {
LockFut::lock(id).await;
// Do crazy stuff
LockFut::unlock(id);
}
Guard from standard Mutex
is !Send
indeed, so it cannot be carried between the await
-s. For that task usually asynchronous mutexes are good thing to consider. There's one in futures
and also there's a stand-alone crate. Their guards are Send
and the problem should be resolved at this point.
But I'd like to go further and say that LockFut
solves the exact same problem as async Mutex does. Thus for this particular example code can be significantly simplified to the following (playground):
use std::sync::Mutex as StdMutex;
use futures::lock::Mutex;
#[derive(Default)]
struct State { .. }
type SharedState = Arc<Mutex<State>>;
lazy_static! {
static ref LOCKS: StdMutex<HashMap<i64, SharedState>> = Default::default();
}
fn acquire_state<'a>(id: i64) -> SharedState {
Arc::clone(&LOCKS.lock().unwrap().entry(id).or_default())
}
// Acquiring is straightforward:
let mut state = acquire_state(0).lock().await;
// or with your functions:
async fn is_concurrent(id: i64) {
should_not_be_concurrent(id).await;
}
async fn should_not_be_concurrent(id: i64) {
let mut state = acquire_state(id).lock().await;
// Do crazy stuff
// As a bonus there's no need in manual unlocking here
// since `drop(state)` unlocks the mutex.
}
Also, you may find helpful this blog post regarding mutexes in async code.