java - 将 CompletableStage 与最终的错误传播动态结合
问题描述
我需要CompletionStages
动态组合(基于计算结果)以免阻塞执行,最终我需要捕获操作期间可能出现的异常,以便小心关闭执行。
我已经实现了以下内容:
public CompletableFuture<Data> getData() {
final Data accumulator = new Data();
CompletableFuture<Data> result = new CompletableFuture<>();
CompletableStage exec = ... //starting execution
exec.thenComposeAsync(
(res) -> process(accumulator, res)
).thenAccept(t -> result.complete(accumulator));
return result;
}
private CompletionStage<Void> process(Data acc, Result res) {
res.data().forEach(
currData -> {
add.addData(currData);
}
);
if (res.hasMoreData()) {
return res.fetchNextData().thenComposeAsync(
(nextData) -> process(acc, nextData)
);
}
return CompletableFuture.completedFuture(null);
}
我不知道这是否是实施解决方案的最佳方式,但如果一切正常,它就可以工作。当由于任何原因在块中出现异常时,问题就出现了forEach
,错误不会传播回getData
调用者,所以我无法用exceptionally
方法捕获它,以便以安全的方式停止我的应用程序。我想我做错了什么。
解决方案
当传递给的函数thenComposeAsync
异常失败时,返回的futurethenComposeAsync
会异常完成。这会导致由链式普通操作创建的期货也异常完成,而无需评估它们的功能。
规则有三个例外,exceptionally
仅在异常完成后评估以产生替换值,而在任何一种情况下都评估handle
和。whenComplete
所以当你想用一个回退值替换异常时,你可以使用
exec.thenComposeAsync(res -> process(accumulator, res))
.exceptionally(throwable -> fallBack)
.thenAccept(t -> result.complete(accumulator));
必须注意链接exceptionally
before thenAccept
,否则传递给的函数thenAccept
在例外情况下不会被评估。
当您想将异常传播到result
未来时,您可以使用
exec.thenComposeAsync(res -> process(accumulator, res))
.whenComplete((value, throwable) -> {
if(throwable == null) result.complete(accumulator);
else result.completeExceptionally(throwable);
});
throwable
检查反对null
以确定完成是否异常至关重要,因为value
可以null
作为普通结果。即使在普通结果值永远不可能的情况下null
,也建议坚持惯用的解决方案,因为您不知道是否以及何时重用代码。