multithreading - 如何在线程共享的简单 fifo 上通知和等待 condvar
问题描述
我设计了一个简单的fifo,它存储a DequePolicy
,如果元素太多,它可以从后面删除元素:
use super::deque_policy::DequePolicy;
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::Condvar;
pub struct BoundedFifo<T> {
pub deque: VecDeque<T>,
pub policy: Box<dyn DequePolicy<T>>,
//pub condvar: Condvar
}
impl<T> BoundedFifo<T> {
pub fn new(policy: Box<dyn DequePolicy<T>>) -> BoundedFifo<T> {
BoundedFifo{
deque: VecDeque::<T>::new(),
policy: policy,
//condvar: Condvar::new()
}
}
}
impl<T> BoundedFifo<T> {
pub fn pop_front(&mut self) -> Option<T> {
self.policy.before_pop_front(&self.deque);
//self.condvar.notify_all();
self.deque.pop_front()
}
pub fn push_back(&mut self, t: T) {
self.deque.push_back(t);
//self.condvar.notify_all();
self.policy.after_push_back(&mut self.deque);
}
}
我想在 2 个线程之间共享这个 fifo,但是有一个问题。我希望一个线程能够等待元素出现在 fifo 上。这就是为什么我添加了condvar
现在评论的内容。
我评论它是因为我很快意识到要fifo
在线程之间共享,我必须把它放在一个Arc<Mutex<>>
. 因此,例如,为了等待condvar
它在内部fifo: Arc<Mutex<BoundedFifo<u8>>>
,我必须将它锁定在一个线程上:fifo.lock().unwrap().condvar.wait_timeout(...)
. 这基本上可以防止其他线程写入,因此没有必要等待。
然后我就有了这样做的想法:
type Fifo<T> = Arc<(Mutex<BoundedFifo<T>>, Condvar)>;
现在我可以等待一个 condvar,但我还必须记住每当我调用fifo: Fifo<T>
'spush_back
和时通知这个 condvar pop_front
。例如:
let pair = ...
let fifo = pair.0;
let condvar = pair.1;
fifo.push_back(0);
condvar.notify_all();
这很容易出错,因为我很容易忘记调用它。在 C++ 上,它会让我等待共享事物中的某些内容(即使这是不安全的)。
我考虑过创建宏push_back!
,pop_front!
这将push_back
与pop_front
调用 一起调用notify_all
,condvar
但我觉得这不是很优雅。
我对 Rust 有点陌生,所以我正在为此寻求更好的解决方案。例如,是否有可能实现push_back
和pop_front
for Fifo<T>
?这样我就避免了宏:slight_smile:。这是一个想法。
解决方案
让你的线程绕过一个Arc<BoundedFifo<T>>
. 它将使用内部可变性,并在其面向公众的方法中自行完成所有互斥锁和条件操作,这些方法只需要&self
. 您将需要的所有Condition
、Mutex
和policy
方法只需要对它们的不可变引用。
并且不要制作 struct fields pub
。
我不明白你的policy
对象在做什么。
pub struct BoundedFifo<T> {
deque: Mutex<VecDeque<T>>,
policy: Box<dyn DequePolicy<T>>,
condition: Condition,
}
impl<T> BoundedFifo<T> {
/// pops a value off the front, if one is available
/// waits only long enough to get the lock
pub fn pop_front_nonwaiting(&self) -> Option<T> {
let lock = self.deque.lock().unwrap();
(*lock).pop_front()
}
/// waits until an element is available, then pops it.
pub fn pop_front_waiting(&self) -> T {
let lock = self.deque.lock().unwrap();
let lock = self.condition.wait_while(lock,
|dq| dq.len() == 0));
self.condition.notify_all();
// this will be safe because we know len() != 0
(*lock).pop_front().unwrap()
}
pub fn push_back(&self, t: T) {
let lock = self.deque.lock().unwrap();
(*lock).push_back(t);
// overflow/discarding logic goes here
self.condition.notify_all();
}
}
推荐阅读
- mongodb-query - mongodb查询需要改进
- html - 使 flexbox 高度中的 img 100% & 响应式、不一致的 flexbox 渲染结果
- javascript - 我应该在 React 清理期间删除“一次”注册的事件侦听器吗
- java - 为什么带有 for 循环的 String s=null vs String s="null" 给出相同的值?
- angular - core.js:183 Uncaught TypeError:没有'new'就不能调用类构造函数MatCommonModule
- powerbi - Power BI Slicer 不应筛选完整表
- reactjs - 使用 React 钩子在两个 div 或文本之间切换
- css - 我如何使 Tailwind.css 响应?
- arrays - 如何正确初始化 C 结构数组
- javascript - 如何使用 javascript fetch 获取部分内容?