首页 > 解决方案 > 如何实现轮询异步 fn 的 Future 或 Stream?

问题描述

我有一个Test我想实现的结构std::future::Future,它会轮询function

use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
};

struct Test;

impl Test {
    async fn function(&mut self) {}
}

impl Future for Test {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match self.function() {
            Poll::Pending => Poll::Pending,
            Poll::Ready(_) => Poll::Ready(()),
        }
    }
}

那没有用:

error[E0308]: mismatched types
  --> src/lib.rs:17:13
   |
10 |     async fn function(&mut self) {}
   |                                  - the `Output` of this `async fn`'s expected opaque type
...
17 |             Poll::Pending => Poll::Pending,
   |             ^^^^^^^^^^^^^ expected opaque type, found enum `Poll`
   |
   = note: expected opaque type `impl Future`
                     found enum `Poll<_>`

error[E0308]: mismatched types
  --> src/lib.rs:18:13
   |
10 |     async fn function(&mut self) {}
   |                                  - the `Output` of this `async fn`'s expected opaque type
...
18 |             Poll::Ready(_) => Poll::Ready(()),
   |             ^^^^^^^^^^^^^^ expected opaque type, found enum `Poll`
   |
   = note: expected opaque type `impl Future`
                     found enum `Poll<_>`

我知道function必须调用一次,返回的Future必须存储在结构中的某个位置,然后必须轮询保存的未来。我试过这个:

struct Test(Option<Box<Pin<dyn Future<Output = ()>>>>);

impl Test {
    async fn function(&mut self) {}
    fn new() -> Self {
        let mut s = Self(None);
        s.0 = Some(Box::pin(s.function()));
        s
    }
}

那也没有用:

error[E0277]: the size for values of type `(dyn Future<Output = ()> + 'static)` cannot be known at compilation time
   --> src/lib.rs:7:13
    |
7   | struct Test(Option<Box<Pin<dyn Future<Output = ()>>>>);
    |             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ doesn't have a size known at compile-time
    |
    = help: the trait `Sized` is not implemented for `(dyn Future<Output = ()> + 'static)`

在我打电话后,function()&mut参考了Test,因此我无法更改Test变量,因此无法将返回的内容存储FutureTest.

我确实得到了一个不安全的解决方案(受此启发)

struct Test<'a>(Option<BoxFuture<'a, ()>>);

impl Test<'_> {
    async fn function(&mut self) {
        println!("I'm alive!");
    }

    fn new() -> Self {
        let mut s = Self(None);
        s.0 = Some(unsafe { &mut *(&mut s as *mut Self) }.function().boxed());
        s
    }
}

impl Future for Test<'_> {
    type Output = ();
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.0.as_mut().unwrap().poll_unpin(cx)
    }
}

我希望有另一种方法。

标签: asynchronousrustasync-awaitfutureunsafe

解决方案


尽管有时您可能想做与您在这里尝试完成的事情类似的事情,但它们很少见。因此,大多数阅读本文的人,甚至可能是 OP,可能希望重组结构状态和用于单个异步执行的数据是不同的对象。

要回答您的问题,是的,这是有可能的。除非您绝对想使用不安全的代码,否则您将需要使用MutexArc. 您希望在 中操作的所有字段async fn都必须包含在 a 中Mutex,并且函数本身将接受Arc<Self>.

但是,我必须强调,这不是一个漂亮的解决方案,您可能不想这样做。根据您的具体情况,您的解决方案可能会有所不同,但我对 OP 在使用Streams 时试图完成的事情的猜测最好通过类似于我写的这个 gist的东西来解决。

use std::{
    future::Future,
    pin::Pin,
    sync::{Arc, Mutex},
};

struct Test {
    state: Mutex<Option<Pin<Box<dyn Future<Output = ()>>>>>,
    // if available use your async library's Mutex to `.await` locks on `buffer` instead
    buffer: Mutex<Vec<u8>>,
}

impl Test {
    async fn function(self: Arc<Self>) {
        for i in 0..16u8 {
            let data: Vec<u8> = vec![i]; // = fs::read(&format("file-{}.txt", i)).await.unwrap();
            let mut buflock = self.buffer.lock().unwrap();
            buflock.extend_from_slice(&data);
        }
    }
    pub fn new() -> Arc<Self> {
        let s = Arc::new(Self {
            state: Default::default(),
            buffer: Default::default(),
        });

        {
            // start by trying to aquire a lock to the Mutex of the Box
            let mut lock = s.state.lock().unwrap();
            // create boxed future
            let b = Box::pin(s.clone().function());
            // insert value into the mutex
            *lock = Some(b);
        } // block causes the lock to be released

        s
    }
}

impl Future for Test {
    type Output = ();
    fn poll(
        self: std::pin::Pin<&mut Self>,
        ctx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<<Self as std::future::Future>::Output> {
        let mut lock = self.state.lock().unwrap();
        let fut: &mut Pin<Box<dyn Future<Output = ()>>> = lock.as_mut().unwrap();
        Future::poll(fut.as_mut(), ctx)
    }
}

推荐阅读