concurrency - 防止并发执行
问题描述
我想防止同时执行一个异步调用的函数。该函数从超级服务调用,两个连接应该导致一个等待另一个函数调用完成。我认为实现一个 Future 来阻止执行,直到其他线程/连接完成将解决这个问题。谈到我的问题,我将 Futures 存储在 aMutex<HashMap<i64, LockFut>>
但是当我锁定互斥锁以获取并等待 LockFut 时,它显然抱怨 MutexGuard 没有被发送。我不知道如何解决这个问题,或者我的方式很糟糕。
|
132 | let mut locks = LOCKS.lock().unwrap();
| --------- has type `std::sync::MutexGuard<'_, std::collections::HashMap<i64, hoster::hoster::LockFut>>`
...
136 | lock.await;
| ^^^^^^^^^^ await occurs here, with `mut locks` maybe used later
137 | }
| - `mut locks` is later dropped here
这是我未来的实现
lazy_static! {
static ref LOCKS: Mutex<HashMap<i64, LockFut>> = Mutex::new(HashMap::new());
}
struct LockState {
waker: Option<Waker>,
locked: bool
}
struct LockFut {
state: Arc<Mutex<LockState>>
}
impl Future for LockFut {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut state = self.state.lock().unwrap();
match state.locked {
false => {
Poll::Ready(())
},
true => {
state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
}
impl LockFut {
fn new() -> LockFut {
LockFut {
state: Arc::new(Mutex::new(LockState {
locked: false,
waker: None
}))
}
}
pub fn release_lock(&mut self) {
let mut state = self.state.lock().unwrap();
state.locked = false;
if let Some(waker) = state.waker.take() {
waker.wake();
}
}
pub async fn lock<'a>(id: i64) {
let mut locks = LOCKS.lock().unwrap();
// Wait for existing lock to be unlocked or create a new lock
let lock = locks.entry(id).or_insert(LockFut::new());
// Wait for the potential lock to be released
lock.await;
}
pub fn unlock(id: i64) {
match LOCKS.lock().unwrap().get_mut(&id) {
Some(lock) => lock.release_lock(),
None => warn!("No lock found for: {}", id)
};
}
}
这就是我所说的
async fn is_concurrent(id: i64) {
should_not_be_concurrent().await;
}
async fn should_not_be_concurrent(id: i64) {
LockFut::lock(id).await;
// Do crazy stuff
LockFut::unlock(id);
}
解决方案
来自标准Mutex
的保护!Send
确实是,所以它不能在await
-s 之间进行。对于该任务,通常需要考虑异步互斥锁。里面有一个,futures
还有一个独立的板条箱。他们的守卫是Send
,问题应该在这一点上解决。
但我想更进一步说,它LockFut
解决了与异步互斥锁完全相同的问题。因此,对于这个特定的示例代码,可以显着简化为以下(游乐场):
use std::sync::Mutex as StdMutex;
use futures::lock::Mutex;
#[derive(Default)]
struct State { .. }
type SharedState = Arc<Mutex<State>>;
lazy_static! {
static ref LOCKS: StdMutex<HashMap<i64, SharedState>> = Default::default();
}
fn acquire_state<'a>(id: i64) -> SharedState {
Arc::clone(&LOCKS.lock().unwrap().entry(id).or_default())
}
// Acquiring is straightforward:
let mut state = acquire_state(0).lock().await;
// or with your functions:
async fn is_concurrent(id: i64) {
should_not_be_concurrent(id).await;
}
async fn should_not_be_concurrent(id: i64) {
let mut state = acquire_state(id).lock().await;
// Do crazy stuff
// As a bonus there's no need in manual unlocking here
// since `drop(state)` unlocks the mutex.
}
此外,您可能会发现这篇关于异步代码中的互斥锁的博文很有帮助。
推荐阅读
- python - python中范围值的平均值
- c++ - C++ 中 if/else 分支的编译时消除
- python - 使用 FormView 保存在 django 表单中
- javascript - 将一个 div 和一个链接插入 div 两次,并使用多个链接进行此操作?JS
- python - 使用系列作为输入,如何在 pandas 数据框中找到具有匹配值的行?例如 df.loc[系列]?
- excel - 是否可以将多个单元格合并为一个同时省略空单元格?
- sqlite - 带有 SQLITE3 的 GO API 无法从数据库中删除元组
- groovy - 在 Spock “cleanup” 方法中获取测试结果
- android - 无法使用合并标签解析布局
- c - Printf stmt 如何执行 printf("%*.*s",10,7,str); 在详细解释