首页 > 解决方案 > CompletableFuture:转型与组合

问题描述

请考虑“Modern Java in Action”一书(第 2 版,清单 16.16,第 405 页)中的一个示例。在那里,我们有三个 map 操作来从一个流中的所有商店获取产品的折扣价格列表。首先,我们联系每个商店以获取包含非折扣价格和折扣类型的响应,然后将响应解析为 Quote 对象,并将其传递给远程折扣服务,该服务返回一个带有已折扣价格的字符串。

public List<String> findPrices(String product) {

    List<CompletableFuture<String>> priceFutures =
        shops.stream()
            .map(shop -> CompletableFuture.supplyAsync(
                    () -> shop.getPrice(product), executor))
            .map(future -> future.thenApply(Quote::parse))
            .map(future -> future.thenCompose(quote ->
                    CompletableFuture.supplyAsync(
                        () -> Discount.applyDiscount(quote), executor)))
            .collect(toList());

    return priceFutures.stream()
            .map(CompletableFuture::join)
            .collect(toList());
}

thenApply我的问题不是关于和之间的区别thenCompose。我相信,后者用于避免像CompletableFuture<CompletableFuture<...>>. 但是,我不明白,为什么我们需要在CompletableFuture这里创建另一个级别?似乎作者通过创建然后展平嵌套来为代码添加了一些人为的复杂性CompletableFuture,而不是像这样简单地thenApplyAsync在第三张地图中使用:

            .map(shop -> CompletableFuture.supplyAsync(
                    () -> shop.getPrice(product), executor))
            .map(future -> future.thenApply(Quote::parse))
            .map(future -> future.thenApplyAsync(Discount::applyDiscount, executor))

这两种映射用法(原始的 withthenCompose和一个 with thenApplyAsync)是否等效?两者都接受先前映射的结果作为参数,都提供自定义执行器来执行任务,并且都返回相同的CompletableFuture<String>结果。

标签: javamappingcompletable-future

解决方案


是的,thenComposeandsupplyAsync实现和thenApplyAsync直接使用一样。

我没有读过这本书,但可能是某些示例代码专注于某个主题或功能,而不是最简洁或最快的代码。因此,假设您正在考虑使用类似的代码,我会留下一些建议。


关于这段代码的另一个建议是,CompletableFuture通过连续调用将每个链接链接起来有点奇怪map。似乎当前示例构建在先前Stream基于具有多个调用的方法之上,并保持原样但使用CompletableFuture.

我更喜欢单个map并直接链接每个CompletableFuture,这也允许将其重构为自己的方法。

所以这:

            .map(shop -> CompletableFuture.supplyAsync(
                    () -> shop.getPrice(product), executor))
            .map(future -> future.thenApply(Quote::parse))
            .map(future -> future.thenApplyAsync(Discount::applyDiscount, executor))

会变成这样:

            .map(shop ->
                CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor)
                .thenApply(Quote::parse)
                .thenApplyAsync(Discount::applyDiscount, executor))

这个 lambda 很容易变成一个方法,可以在没有 a 的情况下重用Stream,它可以与另一个 组合CompletableFuture,可以测试,可以模拟等。


另一个建议是让你的代码一直异步,这样findPrices就不会阻塞join(或者get,就此而言)。

阻塞的问题在于它可能会阻塞执行器上最后一个可用的线程,从而因线程耗尽而引发死锁。您的代码所依赖的异步代码,最终需要在执行程序上运行,可能永远不会运行。

public CompletableFuture<List<String>> findPricesAsync(String product) {
    // List<CompletableFuture<String>> priceFutures = ...

    CompletableFuture<Void> all = CompletableFuture.allOf(priceFutures.toArray(new CompletableFuture<String>[priceFutures.size()]));
    return all.thenRun(() -> priceFutures.stream()
        .map(CompletableFuture::join));
}

请注意,返回类型从 更改List<String>CompletableFuture<List<String>>。另请注意,最后一次调用join不会阻塞,因为CompletableFuture将调用它的每个都已完成。


最后,我倾向于返回CompletionStage,因为它允许除CompletableFuture. 我还假设返回的对象也实现了Future,它允许get在结果上使用,但不是join,区别在于声明的抛出异常类型。

在我让类似 NIO 的方法返回CompletionStage异步 I/O 的一种情况下,我实现了一个子类,该子类CompletableFuture覆盖了每个*Async没有 executor 参数的方法中使用的默认执行程序。从 Java 9 开始,这变得更容易了,仍然通过子类化,但它只需要覆盖defaultExecutor. 我进行子类化的主要原因是使用组合的替代方法会导致更多的代码(包装结果等等)。另一个原因,但不是我真正担心的,是每个实例都有一个额外的对象要被垃圾收集。

这只是为了证明可能存在CompletionStage实际需要自定义实现的情况,这可能是也可能不是CompletableFuture.


推荐阅读