首页 > 解决方案 > 将 CompletableStage 与最终的错误传播动态结合

问题描述

我需要CompletionStages动态组合(基于计算结果)以免阻塞执行,最终我需要捕获操作期间可能出现的异常,以便小心关闭执行。

我已经实现了以下内容:


public CompletableFuture<Data> getData() {

    final Data accumulator = new Data();

    CompletableFuture<Data> result = new CompletableFuture<>();

    CompletableStage exec = ... //starting execution

    exec.thenComposeAsync(
                    (res) -> process(accumulator, res)
            ).thenAccept(t -> result.complete(accumulator));
    return result;
  }

  private CompletionStage<Void> process(Data acc, Result res) {

    res.data().forEach(
            currData -> {
              add.addData(currData);
            }
    );
    if (res.hasMoreData()) {
      return res.fetchNextData().thenComposeAsync(
              (nextData) -> process(acc, nextData)
      );
    }

    return CompletableFuture.completedFuture(null);

  }

我不知道这是否是实施解决方案的最佳方式,但如果一切正常,它就可以工作。当由于任何原因在块中出现异常时,问题就出现了forEach,错误不会传播回getData调用者,所以我无法用exceptionally方法捕获它,以便以安全的方式停止我的应用程序。我想我做错了什么。

标签: javacompletable-future

解决方案


当传递给的函数thenComposeAsync异常失败时,返回的futurethenComposeAsync会异常完成。这会导致由链式普通操作创建的期货也异常完成,而无需评估它们的功能。

规则有三个例外,exceptionally仅在异常完成后评估以产生替换值,而在任何一种情况下都评估handle和。whenComplete

所以当你想用一个回退值替换异常时,你可以使用

exec.thenComposeAsync(res -> process(accumulator, res))
    .exceptionally(throwable -> fallBack)
    .thenAccept(t -> result.complete(accumulator));

必须注意链接exceptionallybefore thenAccept,否则传递给的函数thenAccept在例外情况下不会被评估。

当您想将异常传播到result未来时,您可以使用

exec.thenComposeAsync(res -> process(accumulator, res))
    .whenComplete((value, throwable) -> {
         if(throwable == null) result.complete(accumulator);
         else result.completeExceptionally(throwable);
    });

throwable检查反对null以确定完成是否异常至关重要,因为value可以null作为普通结果。即使在普通结果值永远不可能的情况下null,也建议坚持惯用的解决方案,因为您不知道是否以及何时重用代码。


推荐阅读