java - Java - CompletableFutures - 如果有异常,我如何取消所有期货
问题描述
我有一个方法(包括在下面)来返回CompletableFuture
s 列表的值。
该方法应该:
- 能够在给定时间后超时。
- 如果异常数量超过 n 个,则能够取消所有期货。
第一点效果很好,并且在超过超时限制后确实会爆炸。(我仍然需要在exectuorService.shutdownNow()
之后打电话才能返回给来电者)。我遇到的问题是我要完成的第二件事。
假设我有一个包含 20,000 个期货的列表,它们都会有一个例外,那么为什么要让它们全部执行,如果我发现有太多例外,那么我认为所有期货都有问题并且我想取消它们。
此外,我希望每个未来单独超时,这可能需要多长时间,但这也行不通,出于下面概述的相同原因,谦虚。
原因似乎是,因为当我调用 时allDoneFuture.thenApply()
,此时它等待并让所有期货完成,无论是成功还是异常。只有在所有这些都完成后,它才会遍历每个未来并获取其结果。到那时,当他们已经完成时,取消有什么好处。
如果有人能告诉我如何实现这一特定需求,我将不胜感激:“监控异常和个别超时,并在此基础上取消所有其他”。
谢谢。
下面是我写的方法:
/**
* @param futures a list of completable futures
* @param timeout how long to allow the futures to run before throwing exception
* @param timeUnit unit of timeout
* @param allowedExceptions how many of the futures do we tolerate exceptions,
* NOTE: if an exception is thrown from the futures it will return null, until it reaches the allowedExceptions threshold
* */
public static <T> List<T> extractFromFutures(List<CompletableFuture<T>> futures, int timeout, TimeUnit timeUnit, int allowedExceptions) {
CompletableFuture<Void> allDoneFuture = CompletableFuture
.allOf(futures.toArray(new CompletableFuture[futures.size()]));
try {
AtomicInteger exceptionCount = new AtomicInteger(0);
return allDoneFuture.thenApply(v ->//when all are done
futures.stream().
map(future -> {
try {
//if only I could set an individual timeout
return future.get(timeout, timeUnit);
} catch (Exception e) {
future.cancel(true);
int curExceptionCnt = exceptionCount.incrementAndGet();
if(curExceptionCnt >= allowedExceptions){
//I would've hoped that it will throw it to the calling try-catch
//and then cancel all futures, but it doesn't
throw new RuntimeException(e);
}
else{
return null;
}
}
}).
collect(Collectors.<T>toList())
).get(timeout, timeUnit);
} catch (Exception e) {
allDoneFuture.cancel(true);
throw new RuntimeException(e);
}
}
解决方案
要在一定数量的异常之后取消所有剩余的期货,您可以调用exceptionally
它们中的每一个并增加异常计数并可能在其中取消它们。
对于单个超时,您可以创建一个类来保存带有超时的未来,然后根据超时对它们进行排序,并get
用超时减去经过的时间进行调用。
static class FutureWithTimeout<T> {
CompletableFuture<T> f;
long timeout;
TimeUnit timeUnit;
FutureWithTimeout(CompletableFuture<T> f, long timeout, TimeUnit timeUnit) {
this.f = f;
this.timeout = timeout;
this.timeUnit = timeUnit;
}
}
public static <T> List<T> extractFromFutures(List<FutureWithTimeout<T>> futures, int allowedExceptions) {
AtomicInteger exceptionCount = new AtomicInteger(0);
futures.forEach(f -> f.f.exceptionally(t -> {
if(exceptionCount.getAndIncrement() == allowedExceptions){
futures.forEach(c -> c.f.cancel(false));
}
return null;
}));
long t = System.nanoTime();
return futures.stream()
.sorted(Comparator.comparingLong(f -> f.timeUnit.toNanos(f.timeout)))
.map(f -> {
try {
return f.f.get(Math.max(0, f.timeUnit.toNanos(f.timeout) - (System.nanoTime() - t)),
TimeUnit.NANOSECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException ex) {
f.f.cancel(false);
return null;
}
})
.collect(Collectors.toList());
}
请注意,这可能会以与传入的顺序不同的顺序返回列表。如果您需要它以相同的顺序,那么您可以将其更改map().collect()
为 a forEachOrdered
,然后在不排序后将它们重新映射到它们的结果中。
mayInterruptIfRunning
参数 to也cancel
没有影响,CompletableFuture
所以我将其更改为 false。
推荐阅读
- dart - 如何在 Flutter 中使卡片小部件可触摸?
- sql - 从同一列中选择具有特定多个值的行?
- java - 如何单击在 Selenium webdriver 中的多个 div 标签内写入的 span 按钮
- linux - 在运行 Apache 服务器的 Ubuntu 机器上播放 VLC,没有播放
- mysql - 按大小写排序:其中值 LIKE,然后是 POSITION
- reactjs - 如何向第三方组件插入附加元素?
- python - Django / Google Cloud:致命:数据库
不存在/服务器错误 500 - apache-spark - 将火花数据帧写入单个镶木地板文件
- java - POJO 类到 RAML 文档的转换
- java - 为嵌入式流设备选择 JVM