首页 > 解决方案 > 推送 SelectAll 流

问题描述

我想要一个结构来处理一些流。所有流共享同一个 Item。逻辑如下:我需要创建一个独特的流,其中包含来自所有其他流的所有项目。我还需要将“新”流添加到“主”流中。我不在乎下一个项目来自哪个流。

为此,我看到select_all了应该执行上述逻辑的函数。

pub struct WsPool {
    merge: Arc<Mutex<SelectAll<Box<dyn Stream<Item=MyItem> + Send + 'static>>>>,
}

impl WsPool {
    pub fn new() -> Self {
        Self {
            merge: Arc::new(Mutex::new(SelectAll::new())),
        }
    }

    pub fn add(&self, s: Box<dyn Stream<Item = MyItem> + Send + 'static>) {
        let mut merge  = self.merge.lock().unwrap();

        merge.push(s);
    }

    pub async fn process(&self) {
        loop {
            let mut merge = self.merge.lock().unwrap();
            let item = merge.await.next();
        }
    }
}

但我收到这些错误:

error[E0277]: `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>` is not a future
  --> src/ws_pool.rs:30:24
   |
30 |             let item = merge.await.next();
   |                        ^^^^^^^^^^^ `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>` is not a future
   |
   = help: the trait `futures::Future` is not implemented for `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>`
note: required by `futures::Future::poll`
  --> /home/allevo/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/future/future.rs:99:5
   |
99 |     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

error[E0277]: `(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)` cannot be unpinned
  --> src/ws_pool.rs:17:40
   |
17 |             merge: Arc::new(Mutex::new(SelectAll::new())),
   |                                        ^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)`
   |
   = note: consider using `Box::pin`
   = note: required because of the requirements on the impl of `futures::Stream` for `Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>`
note: required by `futures::stream::SelectAll::<St>::new`
  --> /home/allevo/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.17/src/stream/select_all.rs:47:5
   |
47 |     pub fn new() -> Self {
   |     ^^^^^^^^^^^^^^^^^^^^

error[E0599]: the method `push` exists for struct `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>`, but its trait bounds were not satisfied
   --> src/ws_pool.rs:24:15
    |
24  |           merge.push(s);
    |                 ^^^^ method cannot be called on `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>` due to unsatisfied trait bounds
    |
   ::: /home/allevo/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/boxed.rs:172:1
    |
172 | / pub struct Box<
173 | |     T: ?Sized,
174 | |     #[unstable(feature = "allocator_api", issue = "32838")] A: Allocator = Global,
175 | | >(Unique<T>, A);
    | |________________- doesn't satisfy `_: futures::Stream`
    |
    = note: the following trait bounds were not satisfied:
            `Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>: futures::Stream`

我究竟做错了什么?否则我如何存储多个流并对其进行迭代?

标签: rustrust-futures

解决方案


你的第一个问题是一个简单的混淆。您想要await一个值,而不是流本身:

let item = merge.next().await;

产生的错误都是因为SelectAll<Box<dyn Stream + Send + 'static>>没有实现Stream。如果你看一下impl Stream for SelectAll,内部流类型实现是受限的Unpin

您可以通过将其添加到边界来解决此问题:

use std::marker::Unpin;
                                               // vvvvv
Arc<Mutex<SelectAll<Box<dyn Stream<Item=MyItem> + Unpin + Send + 'static>>>>

或者更好的解决方案是固定流:

use std::pin::Pin;
                 // vvv
Arc<Mutex<SelectAll<Pin<Box<dyn Stream<Item=MyItem> + Send + 'static>>>>>

不同的是后者可以接受更多的Stream类型。您只需Box::pin在添加它们时使用。

看到它在操场上工作。


推荐阅读