首页 > 解决方案 > 如何将向量作为异步流处理?

问题描述

在我的RSS 阅读器项目中,我想异步阅读我的 RSS 提要。目前,由于此代码块,它们被同步读取

self.feeds = self
    .feeds
    .iter()
    .map(|f| f.read(&self.settings))
    .collect::<Vec<Feed>>();

我想让该代码异步,因为它可以让我更好地处理糟糕的 Web 服务器响应。

我知道我可以使用Stream我可以从我的Vecusingstream::from_iter(...)中创建的将代码转换为类似的东西

self.feeds = stream::from_iter(self.feeds.iter())
    .map(|f| f.read(&self.settings))
    // ???
    .collect::<Vec<Feed>>()
}

但是,我有两个问题

  1. 如何将结果加入一个Vec(这是一个同步结构)?
  2. 如何执行该流?我在考虑使用task::spawn,但它似乎不起作用......

标签: asynchronousrustasync-await

解决方案


如何执行该流?我正在考虑使用task::spawn,但它似乎不起作用

async/await世界上,异步代码意味着由执行程序执行,它不是标准库的一部分,而是由第三方 crate 提供,例如tokio. task::spawn只安排一个实例async fn运行,而不是实际运行它。

如何将结果加入 vec(这是一个同步结构)

您的 rss 阅读器的生计似乎是f.read. 它应该变成一个异步函数。然后 feed 的向量将被映射为 future 的向量,需要轮询完成。

futures板条箱必须futures::stream::futures_unordered::FuturesUnordered帮助你做到这一点。FuturesUnordered本身实现了Streamtrait。然后将此流收集到结果向量中并await像这样完成:

//# tokio = { version = "0.2.4", features = ["full"] }
//# futures = "0.3.1"
use tokio::time::delay_for;
use futures::stream::StreamExt;
use futures::stream::futures_unordered::FuturesUnordered;
use std::error::Error;
use std::time::{Duration, Instant};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let start = Instant::now();
    let feeds = (0..10).collect::<Vec<_>>();
    let res = read_feeds(feeds).await;
    dbg!(res);
    dbg!(start.elapsed());

    Ok(())
}

async fn read_feeds(feeds: Vec<u32>) -> Vec<u32> {
    feeds.iter()
        .map(read_feed)
        .collect::<FuturesUnordered<_>>()
        .collect::<Vec<_>>()
        .await
}

async fn read_feed(feed: &u32) -> u32 {
    delay_for(Duration::from_millis(500)).await;

    feed * 2
}

delay_for是模拟潜在的昂贵操作。它还有助于证明这些读数确实在没有任何明确的线程相关逻辑的情况下同时发生。

这里有一个细微差别。与它的同步对应物不同,读取 rss 提要的结果不再与提要本身的顺序相同,无论哪个返回第一个将在前面。你需要以某种方式处理它。


推荐阅读