首页 > 解决方案 > Java - CompletableFutures - 如果有异常,我如何取消所有期货

问题描述

我有一个方法(包括在下面)来返回CompletableFutures 列表的值。

该方法应该:

  1. 能够在给定时间后超时。
  2. 如果异常数量超过 n 个,则能够取消所有期货。

第一点效果很好,并且在超过超时限制后确实会爆炸。(我仍然需要在exectuorService.shutdownNow()之后打电话才能返回给来电者)。我遇到的问题是我要完成的第二件事。

假设我有一个包含 20,000 个期货的列表,它们都会有一个例外,那么为什么要让它们全部执行,如果我发现有太多例外,那么我认为所有期货都有问题并且我想取消它们。

此外,我希望每个未来单独超时,这可能需要多长时间,但这也行不通,出于下面概述的相同原因,谦虚。

原因似乎是,因为当我调用 时allDoneFuture.thenApply(),此时它等待并让所有期货完成,无论是成功还是异常。只有在所有这些都完成后,它才会遍历每个未来并获取其结果。到那时,当他们已经完成时,取消有什么好处。

如果有人能告诉我如何实现这一特定需求,我将不胜感激:“监控异常和个别超时,并在此基础上取消所有其他”。

谢谢。

下面是我写的方法:

/**
     * @param futures a list of completable futures
     * @param timeout how long to allow the futures to run before throwing exception
     * @param timeUnit unit of timeout
     * @param allowedExceptions how many of the futures do we tolerate exceptions,
     * NOTE: if an exception is thrown from the futures it will return null, until it reaches the allowedExceptions threshold
     * */
    public static <T> List<T> extractFromFutures(List<CompletableFuture<T>> futures, int timeout, TimeUnit timeUnit, int allowedExceptions) {
        CompletableFuture<Void> allDoneFuture = CompletableFuture
                .allOf(futures.toArray(new CompletableFuture[futures.size()]));
        try {
            AtomicInteger exceptionCount = new AtomicInteger(0);
            return allDoneFuture.thenApply(v ->//when all are done
                    futures.stream().
                            map(future -> {
                                try {
                                    //if only I could set an individual timeout
                                    return future.get(timeout, timeUnit);
                                } catch (Exception e) {
                                    future.cancel(true);
                                    int curExceptionCnt = exceptionCount.incrementAndGet();
                                    if(curExceptionCnt >= allowedExceptions){
                                        //I would've hoped that it will throw it to the calling try-catch 
                                        //and then cancel all futures, but it doesn't
                                        throw new RuntimeException(e);
                                    }
                                    else{
                                        return null;
                                    }
                                }
                            }).
                            collect(Collectors.<T>toList())
            ).get(timeout, timeUnit);
        } catch (Exception e) {
            allDoneFuture.cancel(true);
            throw new RuntimeException(e);
        }
    }

标签: javamultithreadingcompletable-future

解决方案


要在一定数量的异常之后取消所有剩余的期货,您可以调用exceptionally它们中的每一个并增加异常计数并可能在其中取消它们。

对于单个超时,您可以创建一个类来保存带有超时的未来,然后根据超时对它们进行排序,并get用超时减去经过的时间进行调用。

static class FutureWithTimeout<T> {
    CompletableFuture<T> f;
    long timeout;
    TimeUnit timeUnit;

    FutureWithTimeout(CompletableFuture<T> f, long timeout, TimeUnit timeUnit) {
        this.f = f;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
    }
}
public static <T> List<T> extractFromFutures(List<FutureWithTimeout<T>> futures, int allowedExceptions) {
    AtomicInteger exceptionCount = new AtomicInteger(0);
    futures.forEach(f -> f.f.exceptionally(t -> {
        if(exceptionCount.getAndIncrement() == allowedExceptions){
            futures.forEach(c -> c.f.cancel(false));
        }
        return null;
    }));
    long t = System.nanoTime();
    return futures.stream()
        .sorted(Comparator.comparingLong(f -> f.timeUnit.toNanos(f.timeout)))
        .map(f -> {
            try {
                return f.f.get(Math.max(0, f.timeUnit.toNanos(f.timeout) - (System.nanoTime() - t)), 
                    TimeUnit.NANOSECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException ex) {
                f.f.cancel(false);
                return null;
            }
        })
        .collect(Collectors.toList());
}

请注意,这可能会以与传入的顺序不同的顺序返回列表。如果您需要它以相同的顺序,那么您可以将其更改map().collect()为 a forEachOrdered,然后在不排序后将它们重新映射到它们的结果中。

mayInterruptIfRunning参数 to也cancel没有影响,CompletableFuture所以我将其更改为 false。


推荐阅读