asynchronous - 如何将向量作为异步流处理?
问题描述
在我的RSS 阅读器项目中,我想异步阅读我的 RSS 提要。目前,由于此代码块,它们被同步读取
self.feeds = self
.feeds
.iter()
.map(|f| f.read(&self.settings))
.collect::<Vec<Feed>>();
我想让该代码异步,因为它可以让我更好地处理糟糕的 Web 服务器响应。
我知道我可以使用Stream
我可以从我的Vec
usingstream::from_iter(...)
中创建的将代码转换为类似的东西
self.feeds = stream::from_iter(self.feeds.iter())
.map(|f| f.read(&self.settings))
// ???
.collect::<Vec<Feed>>()
}
但是,我有两个问题
- 如何将结果加入一个
Vec
(这是一个同步结构)? - 如何执行该流?我在考虑使用
task::spawn
,但它似乎不起作用......
解决方案
如何执行该流?我正在考虑使用
task::spawn
,但它似乎不起作用
在async/await
世界上,异步代码意味着由执行程序执行,它不是标准库的一部分,而是由第三方 crate 提供,例如tokio
. task::spawn
只安排一个实例async fn
运行,而不是实际运行它。
如何将结果加入 vec(这是一个同步结构)
您的 rss 阅读器的生计似乎是f.read
. 它应该变成一个异步函数。然后 feed 的向量将被映射为 future 的向量,需要轮询完成。
futures
板条箱必须futures::stream::futures_unordered::FuturesUnordered
帮助你做到这一点。FuturesUnordered
本身实现了Stream
trait。然后将此流收集到结果向量中并await
像这样完成:
//# tokio = { version = "0.2.4", features = ["full"] }
//# futures = "0.3.1"
use tokio::time::delay_for;
use futures::stream::StreamExt;
use futures::stream::futures_unordered::FuturesUnordered;
use std::error::Error;
use std::time::{Duration, Instant};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let start = Instant::now();
let feeds = (0..10).collect::<Vec<_>>();
let res = read_feeds(feeds).await;
dbg!(res);
dbg!(start.elapsed());
Ok(())
}
async fn read_feeds(feeds: Vec<u32>) -> Vec<u32> {
feeds.iter()
.map(read_feed)
.collect::<FuturesUnordered<_>>()
.collect::<Vec<_>>()
.await
}
async fn read_feed(feed: &u32) -> u32 {
delay_for(Duration::from_millis(500)).await;
feed * 2
}
delay_for
是模拟潜在的昂贵操作。它还有助于证明这些读数确实在没有任何明确的线程相关逻辑的情况下同时发生。
这里有一个细微差别。与它的同步对应物不同,读取 rss 提要的结果不再与提要本身的顺序相同,无论哪个返回第一个将在前面。你需要以某种方式处理它。
推荐阅读
- ruby-on-rails - 如何处理 after_create 错误?(尝试创建 webhook)
- r - 如何在保留矩阵结构的情况下在 R 中创建热图矩阵?
- c++ - 为什么不能实例化原子对?
- javascript - 检查自 NodeJS 中上次间隔以来更改的属性
- c# - C# 检查字符串是否等于 const name
- python - 我无法在 Mac 上使用 Python3 安装 Psycopg2。我已经安装了 Python3 和 pip3
- ios - 使用 Button 生成 FlexBox 布局
- javascript - 测试方法被过度指定
- mule - Define global dataweave function in mule 3
- c - 默认情况下,局部变量中有什么?