rust - 如何将 hyper 的 Body 流转换为 Result>?
问题描述
我正在将代码更新到最新版本的超级和期货,但我尝试过的所有东西都错过了某种或另一种实现的特征。
一个不工作的示例游乐场......
extern crate futures; // 0.3.5
extern crate hyper; // 0.13.6
use futures::{future, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use hyper::body;
fn get_body_as_vec<'a>(b: body::Body) -> future::BoxFuture<'a, Result<Vec<String>, hyper::Error>> {
let f = b.and_then(|bytes| {
let s = std::str::from_utf8(&bytes).expect("sends no utf-8");
let mut lines: Vec<String> = Vec::new();
for l in s.lines() {
lines.push(l.to_string());
}
future::ok(lines)
});
Box::pin(f)
}
这会产生错误:
error[E0277]: the trait bound `futures::stream::AndThen<hyper::Body, futures::future::Ready<std::result::Result<std::vec::Vec<std::string::String>, hyper::Error>>, [closure@src/lib.rs:8:24: 15:6]>: futures::Future` is not satisfied
--> src/lib.rs:17:5
|
17 | Box::pin(f)
| ^^^^^^^^^^^ the trait `futures::Future` is not implemented for `futures::stream::AndThen<hyper::Body, futures::future::Ready<std::result::Result<std::vec::Vec<std::string::String>, hyper::Error>>, [closure@src/lib.rs:8:24: 15:6]>`
|
= note: required for the cast to the object type `dyn futures::Future<Output = std::result::Result<std::vec::Vec<std::string::String>, hyper::Error>> + std::marker::Send`
我无法创造一个兼容的未来。Body
是一个流,我找不到任何实现了所需特征的“转换器”函数。
在 hyper 0.12 中,我使用了concat2()
.
解决方案
请注意,此函数使用接收流并返回它的包装版本。
要处理整个流并返回代表成功或错误的单个未来,请
try_for_each
改用。
是的,你f
仍然是一个流,try_for_each
将作为参考建议工作,但try_fold
将字节表示为向量中的行,但正如@Shepmaster 在评论中指出的那样,这是一个更好的选择;如果我们直接将块转换为 UTF-8,我们可能会失去响应中多字节字符的完整性。
由于数据的一致性,最简单的解决方案可能是在转换为 UTF-8 之前收集所有字节。
use futures::{future, FutureExt, TryStreamExt};
use hyper::body;
fn get_body_as_vec<'a>(b: body::Body) -> future::BoxFuture<'a, Result<Vec<String>>> {
let f = b
.try_fold(vec![], |mut vec, bytes| {
vec.extend_from_slice(&bytes);
future::ok(vec)
})
.map(|x| {
Ok(std::str::from_utf8(&x?)?
.lines()
.map(ToString::to_string)
.collect())
});
Box::pin(f)
}
您可以使用channel
from hyper Body 来测试多块行为。这是我在块场景中创建的行分区,这将适用于上面的代码,但如果您直接处理块,您将失去一致性。
let (mut sender, body) = body::Body::channel();
tokio::spawn(async move {
sender
.send_data("Line1\nLine2\nLine3\nLine4\nLine5".into())
.await;
sender
.send_data("next bytes of Line5\nLine6\nLine7\nLine8\n----".into())
.await;
});
println!("{:?}", get_body_as_vec(body).await);
- 游乐场(成功场景)
- Playground(失败场景:“Line5 的下一个字节”将表示为新行
Vec
)
注意:我已经使用std::error:Error
了作为返回类型,因为两者都hyper::Error
实现FromUtf8Error
了,你仍然可以使用你的expect
策略hyper::Error
。
推荐阅读
- sql-server-2014 - 如何屏蔽某些特定用户的字段值?
- sql - 用于将日期(dd/mm/yyyy 转换为 yyyyQ1)的 SQLite 代码在 BigQuery SQL 编辑器中不起作用
- javascript - 如何使用 POST 保存从 NodeJS 发送的文件
- amazon-web-services - AWS VPC 客户端终端节点
- integration - 如何在主干应用程序中集成 Lokalise 的复数?
- r - 从比较过程中生成数据框
- html - 如何修复我的媒体查询以更改文本大小?
- javascript - Vue从数据中的函数访问数据
- java - Apache flink 分区
- powershell - 使当前项目 ($_ / $PSItem) 可用于模块函数中的脚本块参数