首页 > 解决方案 > 如何在 Tokio 的 Sink 中使用 Pin?

问题描述

我想创建一个在实现 Tokio's 的结构上发送一些数据的方法Sink,但是我在使用Pinself. 本质上,我需要这样的东西:

fn send_data(&mut self, data: Item, cx: &mut Context) -> Poll<Result<(), Error>> {
    futures_core::ready!(something.poll_ready(cx))?;
    something.start_send(data)?;
    futures_core::ready!(something.poll_close(cx))
}

问题是每次调用poll_ready(),start_send()poll_close()接受self: Pin<&mut Self>,我不知道something我的用例应该是什么。如果我尝试使用let something = Pin::new(self);thensomething在调用之后会被移动,poll_ready()并且我不能将它用于后续调用(此时 self 也消失了)。我该如何解决这个问题?

use futures_core;
use std::pin::Pin;
use tokio::prelude::*; // 0.3.0-alpha.1

struct Test {}

impl Sink<i32> for Test {
    type Error = ();

    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn start_send(self: Pin<&mut Self>, item: i32) -> Result<(), Self::Error> {
        Ok(())
    }

    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }
}

impl Test {
    fn send_data(&mut self, data: i32, cx: &mut Context) -> Poll<Result<(), Error>> {
        // what should "something" here be?
        futures_core::ready!(something.poll_ready(cx))?;
        something.start_send(data)?;
        futures_core::ready!(something.poll_close(cx))
    }
}

标签: rustfuturerust-tokio

解决方案


我自己对 Rust 很陌生(本周才开始),但我今天早些时候通过执行以下操作解决了一个非常相似的问题:

  1. 创建一个固定版本somethinglet mut something_pinned = Box::pin(something);
  2. .as_mut()在每次调用、poll_readystart_send之前调用poll_close

所以在你的send_data,它应该看起来像:

futures_core::ready!(something_pinned.as_mut().poll_ready(cx))?;
something_pinned.as_mut().start_send(data)?;
futures_core::ready!(something_pinned.as_mut().poll_close(cx))

另外,我认为您通常应该存储您的哪些子轮询步骤已经完成的“记忆”,例如。这样当您的函数被重新调用时,它不会start_send在已经看到它成功/准备就绪后调用额外的时间。(尽管在这种情况下冗余调用它可能不会导致任何实际问题)


推荐阅读