首页 > 解决方案 > `FuturesUnordered` 不满足`Stream`?

问题描述

我想实现Stream基于 a 的 a FuturesUnordered,它再次应该评估async返回类型为 的函数Result<SomeEnum>,但为了参数的简单起见,我们假设它只是 a Result<f64>。由于async fns 最终返回Futures,我假设以下方式是我必须定义我的结构的方式:

use anyhow::Result;
use futures::{Future, Stream, stream::FuturesUnordered};
use std::{pin::Pin, task::Poll};
use pin_project::pin_project;

#[pin_project]
pub struct MyDerivedStream<'a> {
    #[pin]
    from_futures: FuturesUnordered<&'a (dyn Future<Output = Result<f64>> + Send)>,
}

impl Stream for MyDerivedStream<'_> {
    type Item = Result<f64>;

    fn poll_next(
        self: Pin<&mut Self>,
        c: &mut std::task::Context<'_>,
    ) -> Poll<Option<<Self as Stream>::Item>> {
        let this = self.project();

        this.from_futures.poll_next(c)
    }
}

我现在遇到的问题是,由于某种原因,由于不满足特征界限,poll_next函数上的函数无法编译。(在这个 Playground 示例中亲自查看):FuturesUnorderedStream

error[E0599]: the method `poll_next` exists for struct `Pin<&mut FuturesUnordered<&dyn futures::Future<Output = std::result::Result<f64, anyhow::Error>> + std::marker::Send>>`, but its trait bounds were not satisfied
  --> src/lib.rs:21:27
   |
21 |         this.from_futures.poll_next(c)
   |                           ^^^^^^^^^ method cannot be called on `Pin<&mut FuturesUnordered<&dyn futures::Future<Output = std::result::Result<f64, anyhow::Error>> + std::marker::Send>>` due to unsatisfied trait bounds
   | 
  ::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.14/src/stream/futures_unordered/mod.rs:55:1
   |
55 | pub struct FuturesUnordered<Fut> {
   | -------------------------------- doesn't satisfy `_: futures::Stream`
   |
   = note: the following trait bounds were not satisfied:
           `&dyn futures::Future<Output = std::result::Result<f64, anyhow::Error>> + std::marker::Send: futures::Future`
           which is required by `FuturesUnordered<&dyn futures::Future<Output = std::result::Result<f64, anyhow::Error>> + std::marker::Send>: futures::Stream`

我正在努力理解这里的问题。就我所见FuturesUnordered,确实实现Stream了,那么这里的实际问题是什么?是&'a dyn Future- 如果是这样,我还需要如何在此处定义类型以使其工作?

标签: ruststreamfuture

解决方案


&'a dyn Future不实现Future,这是impl Stream for FuturesUnordered. 一种解决方案是替换&'a dyn FuturePin<&mut 'a dyn Future>

use anyhow::Result;
use futures::{Future, Stream, stream::FuturesUnordered};
use std::{pin::Pin, task::Poll};
use pin_project::pin_project;

#[pin_project]
pub struct MyDerivedStream<'a> {
    #[pin]
    from_futures: FuturesUnordered<Pin<&'a mut(dyn Future<Output = Result<f64>> + Send)>>,
}

impl<'a> Stream for MyDerivedStream<'a> {
    type Item = Result<f64>;

    fn poll_next(
        self: Pin<&mut Self>,
        c: &mut std::task::Context<'_>,
    ) -> Poll<Option<<Self as Stream>::Item>> {
        let this = self.project().from_futures;

        this.poll_next(c)
    }
}

有必要可变地借用 中的项目FuturesUnordered,这通过检查需要的Futures::poll函数变得明显self: Pin<&mut Self>。的实现Stream for FuturesUnordered需要轮询包装的项目以确定何时可以产生新项目,这对于共享引用是不可能的。

如果没有Pin周围的&mut Future,则有可能mem::replace包裹的未来并导致它Future永远不会被实际轮询。

这是了解有关 Pinning 的更多信息的绝佳资源:https ://fasterthanli.me/articles/pin-and-suffering


推荐阅读