首页 > 解决方案 > java.util.concurrent.CompletableFuture 中的异常传播

问题描述

有两个代码片段。

在第一个中,我们从总是抛出一些异常的任务中创建 CompletableFuture。然后我们将“例外”方法应用于这个未来,然后是“接受”方法。我们不会将Accept 方法返回的新未来分配给任何变量。然后我们在原始未来调用“加入”。我们看到的是“异常”方法以及“thenAccept”已被调用。我们看到它是因为他们在输出中打印了适当的行。但是异常并没有被“异常”方法抑制。在这种情况下,抑制异常并为我们提供一些默认值正是我们对“异常”的期望。

在第二个片段中,我们做了几乎相同的事情,但将新返回的未来分配给变量并在其上调用“join”。在这种情况下,正如预期的那样,异常被抑制。

从我的第一部分的观点来看,一致的行为要么不抑制异常,也不调用“异常”和“thenAccept”,或者异常调用并抑制异常。

为什么我们之间有一些东西?

第一个片段:

public class TestClass {
    public static void main(String[] args) {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(TestClass::doSomethingForInteger);

        future.exceptionally(e -> {
                    System.out.println("Exceptionally");
                    return 42;
                })
                .thenAccept(r -> {
                    System.out.println("Accept");
                });

        future.join();
    }

    private static int doSomethingForInteger() {
        throw new IllegalArgumentException("Error");
    }
}

第二个片段:

public class TestClass {
    public static void main(String[] args) {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(TestClass::doSomethingForInteger);

        CompletableFuture<Void> voidCompletableFuture = future.exceptionally(e -> {
            System.out.println("Exceptionally");
            return 42;
        })
                .thenAccept(r -> {
                    System.out.println("Accept");
                });

        voidCompletableFuture.join();
    }

    private static int doSomethingForInteger() {
        throw new IllegalArgumentException("Error");
    }
}

标签: javajava.util.concurrentcompletable-futureforkjoinpool

解决方案


没有“抑制异常”之类的东西。当您调用时exceptionally,您正在创建一个新的未来,如果上一阶段异常完成,它将使用前一阶段的结果或评估函数的结果来完成。前一个阶段,即您正在调用的未来exceptionally,不受影响。

这适用于链接依赖函数或操作的所有方法。这些方法中的每一种都创造了一个新的未来,它将按照记录的方式完成。它们都不会影响您正在调用该方法的现有未来。

也许,通过以下示例会变得更加清晰:

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
    return "a string";
});

CompletableFuture<Integer> f2 = f1.thenApply(s -> {
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
    return s.length();
});

f2.thenAccept(i -> System.out.println("result of f2 = "+i));

String s = f1.join();
System.out.println("result of f1 = "+s);

ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);

在这里,应该清楚的是,依赖阶段Integer的结果 a 不能取代先决条件阶段的结果 a String。这只是两个不同的未来,不同的结果。而且由于调用第一阶段join()f1结果查询,它不依赖于f2,因此甚至不等待它的完成。(这也是代码在最后等待所有后台活动结束的原因)。

的用法exceptionally不一样。在非异常情况下,下一个阶段具有相同的类型甚至相同的结果可能会令人困惑,但这并没有改变存在两个不同阶段的事实。

static void report(String s, CompletableFuture<?> f) {
    f.whenComplete((i,t) -> {
        if(t != null) System.out.println(s+" completed exceptionally with "+t);
        else System.out.println(s+" completed with value "+i);
    });
}
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
    throw new IllegalArgumentException("Error for testing");
});
CompletableFuture<Integer> f2 = f1.exceptionally(t -> 42);

report("f1", f1);
report("f2", f2);

ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);

链式方法似乎有一种普遍的心态,即CompletableFuture成为单一未来的某种构建者,不幸的是,这是错误的。另一个陷阱是以下错误:

CompletableFuture<?> f = CompletableFuture.supplyAsync(() -> {
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
    System.out.println("initial stage");
    return "";
}).thenApply(s -> {
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
    System.out.println("second stage");
    return s;
}).thenApply(s -> {
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
    System.out.println("third stage");
    return s;
}).thenAccept(s -> {
    System.out.println("last stage");
});

f.cancel(true);
report("f", f);

ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);

如前所述,每个链式方法都会创建一个新阶段,因此保持对最后一个链式方法返回的阶段(即最后一个阶段)的引用适合获得最终结果。但是取消这个阶段只会取消最后一个阶段,而不会取消任何先决条件阶段。此外,在取消之后,最后一个阶段不再依赖于其他阶段,因为它已经通过取消完成并能够报告此异常结果,而其他现在不相关的阶段仍在后台进行评估。


推荐阅读