首页 > 解决方案 > 如何将 hyper 的 Body 流转换为 Result>?

问题描述

我正在将代码更新到最新版本的超级和期货,但我尝试过的所有东西都错过了某种或另一种实现的特征。

一个不工作的示例游乐场......

extern crate futures; // 0.3.5
extern crate hyper; // 0.13.6

use futures::{future, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use hyper::body;

fn get_body_as_vec<'a>(b: body::Body) -> future::BoxFuture<'a, Result<Vec<String>, hyper::Error>> {
    let f = b.and_then(|bytes| {
        let s = std::str::from_utf8(&bytes).expect("sends no utf-8");
        let mut lines: Vec<String> = Vec::new();
        for l in s.lines() {
            lines.push(l.to_string());
        }
        future::ok(lines)
    });

    Box::pin(f)
}

这会产生错误:

error[E0277]: the trait bound `futures::stream::AndThen<hyper::Body, futures::future::Ready<std::result::Result<std::vec::Vec<std::string::String>, hyper::Error>>, [closure@src/lib.rs:8:24: 15:6]>: futures::Future` is not satisfied
  --> src/lib.rs:17:5
   |
17 |     Box::pin(f)
   |     ^^^^^^^^^^^ the trait `futures::Future` is not implemented for `futures::stream::AndThen<hyper::Body, futures::future::Ready<std::result::Result<std::vec::Vec<std::string::String>, hyper::Error>>, [closure@src/lib.rs:8:24: 15:6]>`
   |
   = note: required for the cast to the object type `dyn futures::Future<Output = std::result::Result<std::vec::Vec<std::string::String>, hyper::Error>> + std::marker::Send`

我无法创造一个兼容的未来。Body是一个流,我找不到任何实现了所需特征的“转换器”函数。

在 hyper 0.12 中,我使用了concat2().

标签: rustfuturehyper

解决方案


and_then 的参考

请注意,此函数使用接收流并返回它的包装版本。

要处理整个流并返回代表成功或错误的单个未来,请try_for_each改用。

是的,你f仍然是一个流,try_for_each将作为参考建议工作,但try_fold将字节表示为向量中的行,但正如@Shepmaster 在评论中指出的那样,这是一个更好的选择;如果我们直接将块转换为 UTF-8,我们可能会失去响应中多字节字符的完整性。

由于数据的一致性,最简单的解决方案可能是在转换为 UTF-8 之前收集所有字节。

use futures::{future, FutureExt, TryStreamExt};
use hyper::body;

fn get_body_as_vec<'a>(b: body::Body) -> future::BoxFuture<'a, Result<Vec<String>>> {
    let f = b
        .try_fold(vec![], |mut vec, bytes| {
            vec.extend_from_slice(&bytes);
            future::ok(vec)
        })
        .map(|x| {
            Ok(std::str::from_utf8(&x?)?
                .lines()
                .map(ToString::to_string)
                .collect())
        });

    Box::pin(f)
}

操场


您可以使用channelfrom hyper Body 来测试多块行为。这是我在块场景中创建的行分区,这将适用于上面的代码,但如果您直接处理块,您将失去一致性。

let (mut sender, body) = body::Body::channel();

tokio::spawn(async move {
    sender
        .send_data("Line1\nLine2\nLine3\nLine4\nLine5".into())
        .await;
    sender
        .send_data("next bytes of Line5\nLine6\nLine7\nLine8\n----".into())
        .await;
});

println!("{:?}", get_body_as_vec(body).await);
  • 游乐场(成功场景)
  • Playground(失败场景:“Line5 的下一个字节”将表示为新行Vec

注意:我已经使用std::error:Error了作为返回类型,因为两者都hyper::Error实现FromUtf8Error了,你仍然可以使用你的expect策略hyper::Error


推荐阅读