首页 > 解决方案 > 防止并发执行

问题描述

我想防止同时执行一个异步调用的函数。该函数从超级服务调用,两个连接应该导致一个等待另一个函数调用完成。我认为实现一个 Future 来阻止执行,直到其他线程/连接完成将解决这个问题。谈到我的问题,我将 Futures 存储在 aMutex<HashMap<i64, LockFut>>但是当我锁定互斥锁以获取并等待 LockFut 时,它显然抱怨 MutexGuard 没有被发送。我不知道如何解决这个问题,或者我的方式很糟糕。

    |
132 |         let mut locks = LOCKS.lock().unwrap();
    |             --------- has type `std::sync::MutexGuard<'_, std::collections::HashMap<i64, hoster::hoster::LockFut>>`
...
136 |         lock.await;
    |         ^^^^^^^^^^ await occurs here, with `mut locks` maybe used later
137 |     }
    |     - `mut locks` is later dropped here

这是我未来的实现

lazy_static! {
   static ref LOCKS: Mutex<HashMap<i64, LockFut>> = Mutex::new(HashMap::new());
}

struct LockState {
    waker: Option<Waker>,
    locked: bool
}

struct LockFut {
    state: Arc<Mutex<LockState>>
}

impl Future for LockFut {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut state = self.state.lock().unwrap();
        match state.locked {
            false => {
                Poll::Ready(())
            },
            true => {
                state.waker = Some(cx.waker().clone());
                Poll::Pending
            }
        }
    }
}

impl LockFut {
    fn new() -> LockFut {
        LockFut {
            state: Arc::new(Mutex::new(LockState {
                locked: false,
                waker: None
            }))
        }
    }

    pub fn release_lock(&mut self) {
        let mut state = self.state.lock().unwrap();
        state.locked = false;
        if let Some(waker) = state.waker.take() {
            waker.wake();
        }
    }

    pub async fn lock<'a>(id: i64) {
        let mut locks = LOCKS.lock().unwrap();
        // Wait for existing lock to be unlocked or create a new lock
        let lock = locks.entry(id).or_insert(LockFut::new());
        // Wait for the potential lock to be released
        lock.await;
    }

    pub fn unlock(id: i64) {
        match LOCKS.lock().unwrap().get_mut(&id) {
            Some(lock) => lock.release_lock(),
            None => warn!("No lock found for: {}", id)
        };
    }
}

这就是我所说的

async fn is_concurrent(id: i64) {
  should_not_be_concurrent().await;
}

async fn should_not_be_concurrent(id: i64) {
  LockFut::lock(id).await;
  // Do crazy stuff
  LockFut::unlock(id);
}

标签: concurrencyrustasync-awaithyper

解决方案


来自标准Mutex的保护!Send确实是,所以它不能在await-s 之间进行。对于该任务,通常需要考虑异步互斥锁。里面有一个,futures还有一个独立的板条箱。他们的守卫是Send,问题应该在这一点上解决。

但我想更进一步说,它LockFut解决了与异步互斥锁完全相同的问题。因此,对于这个特定的示例代码,可以显着简化为以下(游乐场):

use std::sync::Mutex as StdMutex;
use futures::lock::Mutex;

#[derive(Default)]
struct State { .. }

type SharedState = Arc<Mutex<State>>;

lazy_static! {
   static ref LOCKS: StdMutex<HashMap<i64, SharedState>> = Default::default();
}

fn acquire_state<'a>(id: i64) -> SharedState {
    Arc::clone(&LOCKS.lock().unwrap().entry(id).or_default())
}


// Acquiring is straightforward:
let mut state = acquire_state(0).lock().await;


// or with your functions:
async fn is_concurrent(id: i64) {
    should_not_be_concurrent(id).await;
}

async fn should_not_be_concurrent(id: i64) {
    let mut state = acquire_state(id).lock().await;
    // Do crazy stuff

    // As a bonus there's no need in manual unlocking here
    // since `drop(state)` unlocks the mutex. 
}

此外,您可能会发现这篇关于异步代码中的互斥锁的博文很有帮助。


推荐阅读