首页 > 解决方案 > 创建连接到共享状态的循环 Tokio 流

问题描述

我遇到了一个我不太了解的问题,并希望有人能够看到我误解的内容。

问题很简单:我有一个全局状态(在多个任务之间共享)并且希望在全局状态下的向量上有一个无限循环。然后,我将使用间隔流压缩它,从而定期发出流中的下一个值。

如果状态中的向量发生变化,无限流应该只是重新加载向量并开始从新向量开始读取,并丢弃旧数组。

这是到目前为止我得到的代码,问题在帖子的末尾。

use futures::stream::Stream;
use futures::{Async, Poll};
use std::iter::Cycle;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tokio::timer::Interval;

我们定义了一个全局状态,其中包含一个可以更新的数组。每当数组更新时,我们将步进版本并设置数组。

struct State<T> {
    version: u32,
    array: Vec<T>,
}

impl<T> State<T> {
    fn new(array: Vec<T>) -> Self {
        Self {
            version: 0,
            array: Vec::new(),
        }
    }

    fn update(&mut self, array: Vec<T>) {
        self.version += 1;
        self.array = array;
    }
}

现在,我们在状态上创建一个流。初始化时,它将从状态中读取数组和版本并将其存储,然后在std::iter::Cycle内部保留一个将在数组上循环的实例。

struct StateStream<I> {
    state: Arc<Mutex<State<I::Item>>>,
    version: u32,
    iter: Cycle<I>,
}

impl<I> StateStream<I>
where
    I: Iterator,
{
    fn new(state: Arc<Mutex<State<I::Item>>>) -> Self {
        let (version, array) = {
            let locked_state = state.lock().unwrap();
            (locked_state.version, locked_state.array)
        };

        Self {
            state: state,
            version: version,
            iter: array.iter().cycle(),
        }
    }
}   

我们现在为StateStream. 每次轮询时,它都会检查状态的版本是否发生了变化,如果发生了变化,则重新加载数组和版本。

然后我们将从迭代器中获取下一个项目并返回它。

impl<I> Stream for StateStream<I>
where
    I: Iterator + Clone,
{
    type Item = I::Item;
    type Error = ();

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        let locked_state = self.state.lock().unwrap();
        if locked_state.version > self.version {
            self.iter = locked_state.array.clone().iter().cycle();
            self.version = locked_state.version;
        }
        Ok(Async::Ready(self.iter.next()))
    }
}

主程序如下所示。我不在这里更新向量,但这对于手头的情况并不重要。

fn main() {
    let state = Arc::new(Mutex::new(State::new(vec![2, 3, 5, 7, 11, 13])));
    let primes = StateStream::new(state)
        .take(20)
        .zip(
            Interval::new(Instant::now(), Duration::from_millis(500))
                .map_err(|err| println!("Error: {}", err)),
        )
        .for_each(|(number, instant)| {
            println!("fire; number={}, instant={:?}", number, instant);
            Ok(())
        });
    tokio::run(primes);
}

编译时,我收到以下错误:

cargo run --example cycle_stream_shared
   Compiling tokio-testing v0.1.0 (/home/mats/crates/tokio-examples)
error[E0308]: mismatched types
  --> examples/cycle_stream_shared.rs:66:19
   |
66 |             iter: array.iter().cycle(),
   |                   ^^^^^^^^^^^^^^^^^^^^ expected type parameter, found struct `std::slice::Iter`
   |
   = note: expected type `std::iter::Cycle<I>`
              found type `std::iter::Cycle<std::slice::Iter<'_, <I as std::iter::Iterator>::Item>>`

error[E0308]: mismatched types
  --> examples/cycle_stream_shared.rs:81:25
   |
81 |             self.iter = locked_state.array.clone().iter().cycle();
   |                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected type parameter, found struct `std::slice::Iter`
   |
   = note: expected type `std::iter::Cycle<I>`
              found type `std::iter::Cycle<std::slice::Iter<'_, <I as std::iter::Iterator>::Item>>`

error: aborting due to 2 previous errors

For more information about this error, try `rustc --explain E0308`.
error: Could not compile `tokio-testing`.

To learn more, run the command again with --verbose.

现在,错误和解释表明无法派生具体类型,但在这种情况下,我使用的是泛型结构 Cycle<I>并希望I实例化为std::slice::Iter<'_, I::Item>. 由于std::slice::Iter已经实现Iterator,并且类型已经实现了所有必要的特征来匹配。

存在一些类似问题的答案,但似乎没有什么与这种情况相匹配:

其他问题主要是指这两个,或这些的变体。

更新:将其更改为泛型类型以证明它仍然不起作用。

标签: ruststreamrust-tokio

解决方案


推荐阅读