rust - 创建连接到共享状态的循环 Tokio 流
问题描述
我遇到了一个我不太了解的问题,并希望有人能够看到我误解的内容。
问题很简单:我有一个全局状态(在多个任务之间共享)并且希望在全局状态下的向量上有一个无限循环。然后,我将使用间隔流压缩它,从而定期发出流中的下一个值。
如果状态中的向量发生变化,无限流应该只是重新加载向量并开始从新向量开始读取,并丢弃旧数组。
这是到目前为止我得到的代码,问题在帖子的末尾。
use futures::stream::Stream;
use futures::{Async, Poll};
use std::iter::Cycle;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tokio::timer::Interval;
我们定义了一个全局状态,其中包含一个可以更新的数组。每当数组更新时,我们将步进版本并设置数组。
struct State<T> {
version: u32,
array: Vec<T>,
}
impl<T> State<T> {
fn new(array: Vec<T>) -> Self {
Self {
version: 0,
array: Vec::new(),
}
}
fn update(&mut self, array: Vec<T>) {
self.version += 1;
self.array = array;
}
}
现在,我们在状态上创建一个流。初始化时,它将从状态中读取数组和版本并将其存储,然后在std::iter::Cycle
内部保留一个将在数组上循环的实例。
struct StateStream<I> {
state: Arc<Mutex<State<I::Item>>>,
version: u32,
iter: Cycle<I>,
}
impl<I> StateStream<I>
where
I: Iterator,
{
fn new(state: Arc<Mutex<State<I::Item>>>) -> Self {
let (version, array) = {
let locked_state = state.lock().unwrap();
(locked_state.version, locked_state.array)
};
Self {
state: state,
version: version,
iter: array.iter().cycle(),
}
}
}
我们现在为StateStream
. 每次轮询时,它都会检查状态的版本是否发生了变化,如果发生了变化,则重新加载数组和版本。
然后我们将从迭代器中获取下一个项目并返回它。
impl<I> Stream for StateStream<I>
where
I: Iterator + Clone,
{
type Item = I::Item;
type Error = ();
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let locked_state = self.state.lock().unwrap();
if locked_state.version > self.version {
self.iter = locked_state.array.clone().iter().cycle();
self.version = locked_state.version;
}
Ok(Async::Ready(self.iter.next()))
}
}
主程序如下所示。我不在这里更新向量,但这对于手头的情况并不重要。
fn main() {
let state = Arc::new(Mutex::new(State::new(vec![2, 3, 5, 7, 11, 13])));
let primes = StateStream::new(state)
.take(20)
.zip(
Interval::new(Instant::now(), Duration::from_millis(500))
.map_err(|err| println!("Error: {}", err)),
)
.for_each(|(number, instant)| {
println!("fire; number={}, instant={:?}", number, instant);
Ok(())
});
tokio::run(primes);
}
编译时,我收到以下错误:
cargo run --example cycle_stream_shared
Compiling tokio-testing v0.1.0 (/home/mats/crates/tokio-examples)
error[E0308]: mismatched types
--> examples/cycle_stream_shared.rs:66:19
|
66 | iter: array.iter().cycle(),
| ^^^^^^^^^^^^^^^^^^^^ expected type parameter, found struct `std::slice::Iter`
|
= note: expected type `std::iter::Cycle<I>`
found type `std::iter::Cycle<std::slice::Iter<'_, <I as std::iter::Iterator>::Item>>`
error[E0308]: mismatched types
--> examples/cycle_stream_shared.rs:81:25
|
81 | self.iter = locked_state.array.clone().iter().cycle();
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected type parameter, found struct `std::slice::Iter`
|
= note: expected type `std::iter::Cycle<I>`
found type `std::iter::Cycle<std::slice::Iter<'_, <I as std::iter::Iterator>::Item>>`
error: aborting due to 2 previous errors
For more information about this error, try `rustc --explain E0308`.
error: Could not compile `tokio-testing`.
To learn more, run the command again with --verbose.
现在,错误和解释表明无法派生具体类型,但在这种情况下,我使用的是泛型结构
Cycle<I>
并希望I
实例化为std::slice::Iter<'_,
I::Item>
. 由于std::slice::Iter
已经实现Iterator
,并且类型已经实现了所有必要的特征来匹配。
存在一些类似问题的答案,但似乎没有什么与这种情况相匹配:
泛型结构的构造函数中的“预期类型参数”错误表明类型不匹配(与解释相同),因为泛型结构定义允许任何类型,但构造需要特定类型。
在这种情况下,我们使用的是泛型类型
Cycle<I>
,I
应该在哪里实现特征,并Iterator
尝试使用实现的类型。std::slice::Iter<..>
Iterator
如何从方法返回特征的实例?说说如何返回一个匹配一个trait的任意类型,这里不是这样。
其他问题主要是指这两个,或这些的变体。
更新:将其更改为泛型类型以证明它仍然不起作用。
解决方案
推荐阅读
- mysql - 将 wordpress db 从 godaddy phpmyadmin 导出到本地 mamp phpmyadmin 显示 1062 键主键重复条目
- python - SciPy stats Gamma PDF - 无法成功遮蔽 PDF 曲线下的区域
- vba - VBA - 合并排序和去重的列表项
- sql - 日期比较和差异
- google-cloud-bigtable - BigTable 是否允许设置微时间戳粒度
- llvm - 如何在 libmvec glibc 的短向量数学库中使用 clang
- c# - 如何在 KendoGrid UI Jquery 中自定义排序?
- docker - 通过 nginx websocket 代理连接时 WebApp 崩溃
- dji-sdk - DJI SDK - 运行连续时间线任务时导航模式未打开
- php - 创建 PHP CURL 中继