rust - `FuturesUnordered` 不满足`Stream`?
问题描述
我想实现Stream
基于 a 的 a FuturesUnordered
,它再次应该评估async
返回类型为 的函数Result<SomeEnum>
,但为了参数的简单起见,我们假设它只是 a Result<f64>
。由于async fn
s 最终返回Future
s,我假设以下方式是我必须定义我的结构的方式:
use anyhow::Result;
use futures::{Future, Stream, stream::FuturesUnordered};
use std::{pin::Pin, task::Poll};
use pin_project::pin_project;
#[pin_project]
pub struct MyDerivedStream<'a> {
#[pin]
from_futures: FuturesUnordered<&'a (dyn Future<Output = Result<f64>> + Send)>,
}
impl Stream for MyDerivedStream<'_> {
type Item = Result<f64>;
fn poll_next(
self: Pin<&mut Self>,
c: &mut std::task::Context<'_>,
) -> Poll<Option<<Self as Stream>::Item>> {
let this = self.project();
this.from_futures.poll_next(c)
}
}
我现在遇到的问题是,由于某种原因,由于不满足特征界限,poll_next
函数上的函数无法编译。(在这个 Playground 示例中亲自查看):FuturesUnordered
Stream
error[E0599]: the method `poll_next` exists for struct `Pin<&mut FuturesUnordered<&dyn futures::Future<Output = std::result::Result<f64, anyhow::Error>> + std::marker::Send>>`, but its trait bounds were not satisfied
--> src/lib.rs:21:27
|
21 | this.from_futures.poll_next(c)
| ^^^^^^^^^ method cannot be called on `Pin<&mut FuturesUnordered<&dyn futures::Future<Output = std::result::Result<f64, anyhow::Error>> + std::marker::Send>>` due to unsatisfied trait bounds
|
::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.14/src/stream/futures_unordered/mod.rs:55:1
|
55 | pub struct FuturesUnordered<Fut> {
| -------------------------------- doesn't satisfy `_: futures::Stream`
|
= note: the following trait bounds were not satisfied:
`&dyn futures::Future<Output = std::result::Result<f64, anyhow::Error>> + std::marker::Send: futures::Future`
which is required by `FuturesUnordered<&dyn futures::Future<Output = std::result::Result<f64, anyhow::Error>> + std::marker::Send>: futures::Stream`
我正在努力理解这里的问题。就我所见FuturesUnordered
,确实实现Stream
了,那么这里的实际问题是什么?是&'a dyn Future
- 如果是这样,我还需要如何在此处定义类型以使其工作?
解决方案
&'a dyn Future
不实现Future
,这是impl Stream for FuturesUnordered
. 一种解决方案是替换&'a dyn Future
为Pin<&mut 'a dyn Future>
:
use anyhow::Result;
use futures::{Future, Stream, stream::FuturesUnordered};
use std::{pin::Pin, task::Poll};
use pin_project::pin_project;
#[pin_project]
pub struct MyDerivedStream<'a> {
#[pin]
from_futures: FuturesUnordered<Pin<&'a mut(dyn Future<Output = Result<f64>> + Send)>>,
}
impl<'a> Stream for MyDerivedStream<'a> {
type Item = Result<f64>;
fn poll_next(
self: Pin<&mut Self>,
c: &mut std::task::Context<'_>,
) -> Poll<Option<<Self as Stream>::Item>> {
let this = self.project().from_futures;
this.poll_next(c)
}
}
有必要可变地借用 中的项目FuturesUnordered
,这通过检查需要的Futures::poll
函数变得明显self: Pin<&mut Self>
。的实现Stream for FuturesUnordered
需要轮询包装的项目以确定何时可以产生新项目,这对于共享引用是不可能的。
如果没有Pin
周围的&mut Future
,则有可能mem::replace
包裹的未来并导致它Future
永远不会被实际轮询。
这是了解有关 Pinning 的更多信息的绝佳资源:https ://fasterthanli.me/articles/pin-and-suffering
推荐阅读
- javascript - Angular - 如何用另一个对象替换对象属性?
- c# - 由于其他触发器,WPF 触发器无法正常工作
- virtual-machine - 将 VMDK 文件转换为压缩的 RAW 文件
- node.js - 修改测试API request-response时express发送的请求对象
- javascript - 从常量数组中获取标签值
- android - __WEBPACK_AMD_DEFINE_ARRAY__ 未定义
- sql - 我如何检查现有记录并避免冗余数据
- xamarin - 使用搜索栏(或条目)呈现的自定义选择器
- c - 在 Eclipse 中对整个 C 项目运行自动格式化程序
- amazon-web-services - 如何在python中处理aws Lambda重试