首页 > 解决方案 > 如何从 FuturesUnordered 返回错误?

问题描述

我有一组要并行运行的期货,如果一个失败,我想让错误返回给调用者。

这是我到目前为止一直在测试的内容:

use futures::prelude::*;
use futures::stream::futures_unordered::FuturesUnordered;
use futures::{future, Future};

fn main() {
    let tasks: FuturesUnordered<_> = (1..10).map(|_| async_func(false)).collect();

    let mut runtime = tokio::runtime::Runtime::new().expect("Unable to start runtime");
    let res = runtime.block_on(tasks.into_future());

    if let Err(_) = res {
        println!("err");
    }
}

fn async_func(success: bool) -> impl Future<Item = (), Error = String> {
    if success {
        future::ok(())
    } else {
        future::err("Error".to_string())
    }
}

如何从任何失败的期货中获取错误?如果单个期货失败,更好的办法是停止运行任何未决期货。

标签: rustrust-tokio

解决方案


您的代码已经返回并处理错误。如果您尝试使用该错误,编译器将快速引导您找到解决方案:

if let Err(e) = res {
    println!("err: {}", e);
}
error[E0277]: `(std::string::String, futures::stream::futures_unordered::FuturesUnordered<impl futures::future::Future>)` doesn't implement `std::fmt::Display`
  --> src/main.rs:12:29
   |
12 |         println!("err: {}", e);
   |                             ^ `(std::string::String, futures::stream::futures_unordered::FuturesUnordered<impl futures::future::Future>)` cannot be formatted with the default formatter
   |
   = help: the trait `std::fmt::Display` is not implemented for `(std::string::String, futures::stream::futures_unordered::FuturesUnordered<impl futures::future::Future>)`
   = note: in format strings you may be able to use `{:?}` (or {:#?} for pretty-print) instead
   = note: required by `std::fmt::Display::fmt`

Err值是您的错误和处理错误后继续提取的原始流的元组。这就是Stream::into_future/StreamFuture所做的。

访问元组中的第一个值以获取错误:

if let Err((e, _)) = res {
    println!("err: {}", e);
}

如果您想查看所有值,您可以一遍又一遍地轮询流(但不要这样做,因为它可能效率低下):

let mut f = tasks.into_future();
loop {
    match runtime.block_on(f) {
        Ok((None, _)) => {
            println!("Stream complete");
            break;
        }
        Ok((Some(v), next)) => {
            println!("Success: {:?}", v);
            f = next.into_future();
        }
        Err((e, next)) => {
            println!("Error: {:?}", e);
            f = next.into_future();
        }
    }
}

推荐阅读