首页 > 解决方案 > 是否可以动态地将新的 CompletableFutures 添加到 CompletableFuture.allOf() ?

问题描述

我有一个包含 3 个表的数据库:

表 A 包含 A 对象的数据
表 B 包含 B 对象的数据
表 C 包含 C 对象的数据

A 对象可以有 0 或 1 个 B 对象
B 对象可以有 0 或 1 个 C 对象
(我知道,这些可能只在一个表中,但仅用于示例)

我想从整个数据库中创建一个 csv 文件:每一行应该包含一个 A 对象,可选的 B 对象,以及可选的 C 对象。

对于每个表,都有一个异步存储库,它返回一个 CompletionStage。因此,当我从存储库 A 中获取 A 对象时,我得到了一个CompletionStage<List<A>>. 完成后,我为每个 A 对象制作一个 Map ,用 A 的数据填充它,然后调用repositoryB.getB(A.id),它返回 a CompletionStage<Optional<B>>。如果 B 值不存在,我会在我的 CSV 文件中追加一个新行,其中包含地图内的数据。如果 B 存在,我将其值添加到地图中,并调用repositoryC.getC(B.id)返回 a CompletionStage<Optional<C>>。如果存在 C,我将其值添加到 Map,并在 CSV 文件中添加新行,如果不存在,则添加新行。

当所有 CompletionStages 完成时,CSV 的创建就完成了。我尝试使用 CompletableFuture.allOf(),但由于一开始我不知道会有多少 CompletionStages,我无法将它们全部添加到 allOf 方法中,所以我认为我需要添加以某种方式动态完成阶段。可能吗?

目前我有一个可行的解决方案,但它在每次 B 和 C 提取后都会阻塞,所以我想让整个代码成为非阻塞的。

这是我的非阻塞尝试,但效果不佳,因为某些 B 和 C 期货未添加到期货列表中,因此代码不会等待它们完成:

CompletableFuture<List<CompletableFuture>> genereteCSV = repositoryA.getAs().thenApplyAsync(listA-> {
            List<CompletableFuture> futures = new ArrayList<>();
            for (A a : listA) {
                Map<String, String> values = new Map<>();
                addAvaluesToMap(values, A);

                CompletableFuture Bfuture = repositoryB.getB(A.id).thenAcceptAsync((optionalB -> {
                    if (optionalB.isPresent()) {
                        addValuesToMap(values, B);

                        CompletableFuture Cfuture = repositoryC.getC(B.id).thenAcceptAsync(optionalC-> {
                            if (optionalC.isPresent()) {
                                addAvaluesToMap(values, C);
                            } 
                            addMapValuesToCSV(values);
                        });
                        futures.add(Cfuture);

                    } else {
                        addMapValuesToCSV(values);
                    }
                }));

                futures.add(Bfuture);
            }
            return futures;
        });

        geerateCSV.thenApplyAsync(futureList-> CompletableFuture.allOf(futureList.toArray(new CompletableFuture<?>[0])))
        .thenAccept(dummy->{System.out.println("CsV generation done");});

标签: asynchronousjava-8completable-futurecompletion-stage

解决方案


基本上,这是实现非阻塞处理的一种可能方案:

  1. 为每个对象 A创建CompletableFuture(可能填充有 B 和 C 对象)

  2. CompletableFuture从s异步收集 A 对象

  3. 将创建的 A 对象写入 CSV 文件

注意:在下面的示例中,我使用addAvaluesToMapaddMapValuesToCSV假设您让它们正常工作。另外,我假设您对CompletableFutures 的使用符合您的目标。

这将是上述方法的实现:

public void generateCSV() {
    repositoryA.getAs().thenAccept(listA -> {
        List<CompletableFuture<A>> futures = listA.stream()
                .map(a -> repositoryB.getB(a.id).thenComposeAsync(optionalB ->
                        optionalB.map(b -> repositoryC.getC(b.id).thenComposeAsync(optionalC -> {
                                    a.setB(b);
                                    return optionalC.map(c -> {
                                        b.setC(c);
                                        return CompletableFuture.completedFuture(a);
                                    }).orElse(CompletableFuture.completedFuture(a));
                                })
                        ).orElse(CompletableFuture.completedFuture(a)))
                ).collect(Collectors.toList());

        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]))
                .thenAccept(v -> futures.stream()
                        .map(CompletableFuture::join)
                        .forEach(a -> {
                            Map<String, String> values = new HashMap<>();
                            addAvaluesToMap(values, a);
                            addMapValuesToCSV(values);
                        })
                )
                .exceptionally(throwable -> {
                    System.out.println("Failed generating CSV. Error: " + throwable);
                    return null;
                });
    }).exceptionally(throwable -> {
        System.out.println("Failed to get list of As. Error: " + throwable);
        return null;
    });
}

推荐阅读