首页 > 解决方案 > 具有 CompletableFutures 且没有自定义 Executor 的代码是否仅使用等于内核数的线程数?

问题描述

我正在阅读 java 8 in action,第 11 章(关于CompletableFutures),它让我想到了我公司的代码库。

java 8 in action book 说,如果你有我在下面写的代码,你一次只能使用 4CompletableFuture秒(如果你有一台 4 核计算机)。这意味着如果您想异步执行例如 10 个操作,您将首先运行前 4 个CompletableFuture,然后是第二个 4,然后是剩下的 2 个,因为默认ForkJoinPool.commonPool()只提供等于Runtime.getRuntime().availableProcessors().

在我公司的代码库中,有一个@Service名为AsyncHelpers 的类,其中包含一个方法load(),它使用CompletableFutures 在单独的块中异步加载有关产品的信息。我想知道他们是否一次只使用 4 个线程。

我公司的代码库中有几个这样的异步助手,例如,一个用于产品列表页面(PLP),一个用于产品详细信息页面(PDP)。产品详细信息页面是专门用于显示其详细特征、交叉销售产品、类似产品和更多内容的特定产品的页面。

有一个架构决定以块的形式加载 pdp 页面的详细信息。加载应该是异步发生的,当前代码使用CompletableFutures. 让我们看一下伪代码:

static PdpDto load(String productId) {
    CompletableFuture<Details> photoFuture =
            CompletableFuture.supplyAsync(() -> loadPhotoDetails(productId));
    CompletableFuture<Details> characteristicsFuture =
            CompletableFuture.supplyAsync(() -> loadCharacteristics(productId));
    CompletableFuture<Details> variations =
            CompletableFuture.supplyAsync(() -> loadVariations(productId));

    // ... many more futures

    try {
        return new PdpDto( // construct Dto that will combine all Details objects into one
                photoFuture.get(),
                characteristicsFuture.get(),
                variations.get(),
                // .. many more future.get()s
        );
    } catch (ExecutionException|InterruptedException e) {
        return new PdpDto(); // something went wrong, return an empty DTO
    }
}

如您所见,上面的代码没有使用自定义执行器。

这是否意味着如果该加载方法有 10CompletableFuture秒并且当前有 2 个人加载 PDP 页面,而我们CompletableFuture总共有 20 秒要加载,那么所有这 20CompletableFuture秒不会一次全部执行,而只有 4 秒一次?

我的同事告诉我每个用户将获得 4 个线程,但我认为 JavaDoc 非常清楚地说明了这一点:

public static ForkJoinPool commonPool() 返回公共池实例。这个池是静态构建的;它的运行状态不受shutdown() 或shutdownNow() 尝试的影响。然而,这个池和任何正在进行的处理都会在程序 System.exit(int) 时自动终止。任何依赖异步任务处理在程序终止之前完成的程序都应该在退出之前调用 commonPool().awaitQuiescence。

这意味着我们网站的所有用户只有 1 个具有 4 个线程的池。

标签: javamultithreadingjava-8futurecompletable-future

解决方案


是的,但比这更糟糕...

公共池的默认大小比处理器/内核数少1(如果只有 1 个处理器,则为 1),因此您实际上一次处理3个,而不是 4 个。

但是您最大的性能损失是并行流(如果您使用它们),因为它们也使用公共池。流旨在用于超快速处理,因此您不希望它们与繁重的任务共享资源。

如果您有设计为异步的任务(即花费超过几毫秒),那么您应该创建一个池来运行它们。这样的池可以由所有调用线程静态创建和重用,从而避免创建池的开销每次使用。您还应该通过对代码进行压力测试来调整池大小,以找到最佳大小以最大化吞吐量并最小化响应时间。


推荐阅读