首页 > 解决方案 > 如何在线程共享的简单 fifo 上通知和等待 condvar

问题描述

我设计了一个简单的fifo,它存储a DequePolicy,如果元素太多,它可以从后面删除元素:

use super::deque_policy::DequePolicy;
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::Condvar;

pub struct BoundedFifo<T> {
    pub deque: VecDeque<T>,
    pub policy: Box<dyn DequePolicy<T>>,
    //pub condvar: Condvar
}

impl<T> BoundedFifo<T> {
    pub fn new(policy: Box<dyn DequePolicy<T>>) -> BoundedFifo<T> {
        BoundedFifo{
            deque: VecDeque::<T>::new(),
            policy: policy,
            //condvar: Condvar::new()
        }
    }
}

impl<T> BoundedFifo<T> {
    pub fn pop_front(&mut self) -> Option<T> {
        self.policy.before_pop_front(&self.deque);
        //self.condvar.notify_all();
        self.deque.pop_front()
    }
    pub fn push_back(&mut self, t: T) {
        self.deque.push_back(t);
        //self.condvar.notify_all();
        self.policy.after_push_back(&mut self.deque);
    }
}

我想在 2 个线程之间共享这个 fifo,但是有一个问题。我希望一个线程能够等待元素出现在 fifo 上。这就是为什么我添加了condvar现在评论的内容。

我评论它是因为我很快意识到要fifo在线程之间共享,我必须把它放在一个Arc<Mutex<>>. 因此,例如,为了等待condvar它在内部fifo: Arc<Mutex<BoundedFifo<u8>>>,我必须将它锁定在一个线程上:fifo.lock().unwrap().condvar.wait_timeout(...). 这基本上可以防止其他线程写入,因此没有必要等待。

然后我就有了这样做的想法:

type Fifo<T> = Arc<(Mutex<BoundedFifo<T>>, Condvar)>;

现在我可以等待一个 condvar,但我还必须记住每当我调用fifo: Fifo<T>'spush_back和时通知这个 condvar pop_front。例如:

let pair = ...
let fifo = pair.0;
let condvar = pair.1;
fifo.push_back(0);
condvar.notify_all();

这很容易出错,因为我很容易忘记调用它。在 C++ 上,它会让我等待共享事物中的某些内容(即使这是不安全的)。

我考虑过创建宏push_back!pop_front!这将push_backpop_front调用 一起调用notify_allcondvar但我觉得这不是很优雅。

我对 Rust 有点陌生,所以我正在为此寻求更好的解决方案。例如,是否有可能实现push_backpop_frontfor Fifo<T>?这样我就避免了宏:slight_smile:。这是一个想法。

标签: multithreadingrust

解决方案


让你的线程绕过一个Arc<BoundedFifo<T>>. 它将使用内部可变性,并在其面向公众的方法中自行完成所有互斥锁和条件操作,这些方法只需要&self. 您将需要的所有ConditionMutexpolicy方法只需要对它们的不可变引用。

并且不要制作 struct fields pub

我不明白你的policy对象在做什么。

pub struct BoundedFifo<T> {
    deque: Mutex<VecDeque<T>>,
    policy: Box<dyn DequePolicy<T>>,
    condition: Condition,
}

impl<T> BoundedFifo<T> {
    /// pops a value off the front, if one is available
    /// waits only long enough to get the lock
    pub fn pop_front_nonwaiting(&self) -> Option<T> {
        let lock = self.deque.lock().unwrap();
        (*lock).pop_front()
    }

    /// waits until an element is available, then pops it.
    pub fn pop_front_waiting(&self) -> T {

        let lock = self.deque.lock().unwrap();
        let lock = self.condition.wait_while(lock,
                                             |dq| dq.len() == 0));
        self.condition.notify_all();
        // this will be safe because we know len() != 0
        (*lock).pop_front().unwrap()
    }
    pub fn push_back(&self, t: T) {
        let lock = self.deque.lock().unwrap();
        (*lock).push_back(t);
        // overflow/discarding logic goes here
        self.condition.notify_all();
    }
}

推荐阅读