rust - 推送 SelectAll 流
问题描述
我想要一个结构来处理一些流。所有流共享同一个 Item。逻辑如下:我需要创建一个独特的流,其中包含来自所有其他流的所有项目。我还需要将“新”流添加到“主”流中。我不在乎下一个项目来自哪个流。
为此,我看到select_all
了应该执行上述逻辑的函数。
pub struct WsPool {
merge: Arc<Mutex<SelectAll<Box<dyn Stream<Item=MyItem> + Send + 'static>>>>,
}
impl WsPool {
pub fn new() -> Self {
Self {
merge: Arc::new(Mutex::new(SelectAll::new())),
}
}
pub fn add(&self, s: Box<dyn Stream<Item = MyItem> + Send + 'static>) {
let mut merge = self.merge.lock().unwrap();
merge.push(s);
}
pub async fn process(&self) {
loop {
let mut merge = self.merge.lock().unwrap();
let item = merge.await.next();
}
}
}
但我收到这些错误:
error[E0277]: `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>` is not a future
--> src/ws_pool.rs:30:24
|
30 | let item = merge.await.next();
| ^^^^^^^^^^^ `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>` is not a future
|
= help: the trait `futures::Future` is not implemented for `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>`
note: required by `futures::Future::poll`
--> /home/allevo/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/future/future.rs:99:5
|
99 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
error[E0277]: `(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)` cannot be unpinned
--> src/ws_pool.rs:17:40
|
17 | merge: Arc::new(Mutex::new(SelectAll::new())),
| ^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)`
|
= note: consider using `Box::pin`
= note: required because of the requirements on the impl of `futures::Stream` for `Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>`
note: required by `futures::stream::SelectAll::<St>::new`
--> /home/allevo/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.17/src/stream/select_all.rs:47:5
|
47 | pub fn new() -> Self {
| ^^^^^^^^^^^^^^^^^^^^
error[E0599]: the method `push` exists for struct `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>`, but its trait bounds were not satisfied
--> src/ws_pool.rs:24:15
|
24 | merge.push(s);
| ^^^^ method cannot be called on `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>` due to unsatisfied trait bounds
|
::: /home/allevo/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/boxed.rs:172:1
|
172 | / pub struct Box<
173 | | T: ?Sized,
174 | | #[unstable(feature = "allocator_api", issue = "32838")] A: Allocator = Global,
175 | | >(Unique<T>, A);
| |________________- doesn't satisfy `_: futures::Stream`
|
= note: the following trait bounds were not satisfied:
`Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>: futures::Stream`
我究竟做错了什么?否则我如何存储多个流并对其进行迭代?
解决方案
你的第一个问题是一个简单的混淆。您想要await
下一个值,而不是流本身:
let item = merge.next().await;
产生的错误都是因为SelectAll<Box<dyn Stream + Send + 'static>>
没有实现Stream
。如果你看一下impl Stream for SelectAll
,内部流类型实现是受限的Unpin
。
您可以通过将其添加到边界来解决此问题:
use std::marker::Unpin;
// vvvvv
Arc<Mutex<SelectAll<Box<dyn Stream<Item=MyItem> + Unpin + Send + 'static>>>>
或者更好的解决方案是固定流:
use std::pin::Pin;
// vvv
Arc<Mutex<SelectAll<Pin<Box<dyn Stream<Item=MyItem> + Send + 'static>>>>>
不同的是后者可以接受更多的Stream
类型。您只需Box::pin
在添加它们时使用。
看到它在操场上工作。
推荐阅读
- node.js - 如何从 Node.js/Express.js 将要下载的文件作为后台进程发送给用户?
- javascript - React - 使用对象内部的对象调用函数
- android - Android Studio Database Inspector 始终将数据库显示为“已关闭”
- javascript - 只有一个使用 react-grid-layout 的 highcharts 可以调整大小
- influxdb - 为什么 InfluxDB v2 配置文件不起作用?
- javascript - 在谷歌标签管理器中使用 Javascript Shopify 产品变体
- fonts - 如何在 Jupyter Lab 中更改输出单元格的字体
- java - 裁剪图像与形状
- javascript - 组合数组中的不同对象
- php - 如何从 2 个表中获取匹配的行