asynchronous - 是否可以动态地将新的 CompletableFutures 添加到 CompletableFuture.allOf() ?
问题描述
我有一个包含 3 个表的数据库:
表 A 包含 A 对象的数据
表 B 包含 B 对象的数据
表 C 包含 C 对象的数据
A 对象可以有 0 或 1 个 B 对象
B 对象可以有 0 或 1 个 C 对象
(我知道,这些可能只在一个表中,但仅用于示例)
我想从整个数据库中创建一个 csv 文件:每一行应该包含一个 A 对象,可选的 B 对象,以及可选的 C 对象。
对于每个表,都有一个异步存储库,它返回一个 CompletionStage。因此,当我从存储库 A 中获取 A 对象时,我得到了一个CompletionStage<List<A>>
. 完成后,我为每个 A 对象制作一个 Map ,用 A 的数据填充它,然后调用repositoryB.getB(A.id)
,它返回 a CompletionStage<Optional<B>>
。如果 B 值不存在,我会在我的 CSV 文件中追加一个新行,其中包含地图内的数据。如果 B 存在,我将其值添加到地图中,并调用repositoryC.getC(B.id)
返回 a CompletionStage<Optional<C>>
。如果存在 C,我将其值添加到 Map,并在 CSV 文件中添加新行,如果不存在,则添加新行。
当所有 CompletionStages 完成时,CSV 的创建就完成了。我尝试使用 CompletableFuture.allOf(),但由于一开始我不知道会有多少 CompletionStages,我无法将它们全部添加到 allOf 方法中,所以我认为我需要添加以某种方式动态完成阶段。可能吗?
目前我有一个可行的解决方案,但它在每次 B 和 C 提取后都会阻塞,所以我想让整个代码成为非阻塞的。
这是我的非阻塞尝试,但效果不佳,因为某些 B 和 C 期货未添加到期货列表中,因此代码不会等待它们完成:
CompletableFuture<List<CompletableFuture>> genereteCSV = repositoryA.getAs().thenApplyAsync(listA-> {
List<CompletableFuture> futures = new ArrayList<>();
for (A a : listA) {
Map<String, String> values = new Map<>();
addAvaluesToMap(values, A);
CompletableFuture Bfuture = repositoryB.getB(A.id).thenAcceptAsync((optionalB -> {
if (optionalB.isPresent()) {
addValuesToMap(values, B);
CompletableFuture Cfuture = repositoryC.getC(B.id).thenAcceptAsync(optionalC-> {
if (optionalC.isPresent()) {
addAvaluesToMap(values, C);
}
addMapValuesToCSV(values);
});
futures.add(Cfuture);
} else {
addMapValuesToCSV(values);
}
}));
futures.add(Bfuture);
}
return futures;
});
geerateCSV.thenApplyAsync(futureList-> CompletableFuture.allOf(futureList.toArray(new CompletableFuture<?>[0])))
.thenAccept(dummy->{System.out.println("CsV generation done");});
解决方案
基本上,这是实现非阻塞处理的一种可能方案:
为每个对象 A创建
CompletableFuture
(可能填充有 B 和 C 对象)CompletableFuture
从s异步收集 A 对象将创建的 A 对象写入 CSV 文件
注意:在下面的示例中,我使用addAvaluesToMap
并addMapValuesToCSV
假设您让它们正常工作。另外,我假设您对CompletableFuture
s 的使用符合您的目标。
这将是上述方法的实现:
public void generateCSV() {
repositoryA.getAs().thenAccept(listA -> {
List<CompletableFuture<A>> futures = listA.stream()
.map(a -> repositoryB.getB(a.id).thenComposeAsync(optionalB ->
optionalB.map(b -> repositoryC.getC(b.id).thenComposeAsync(optionalC -> {
a.setB(b);
return optionalC.map(c -> {
b.setC(c);
return CompletableFuture.completedFuture(a);
}).orElse(CompletableFuture.completedFuture(a));
})
).orElse(CompletableFuture.completedFuture(a)))
).collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]))
.thenAccept(v -> futures.stream()
.map(CompletableFuture::join)
.forEach(a -> {
Map<String, String> values = new HashMap<>();
addAvaluesToMap(values, a);
addMapValuesToCSV(values);
})
)
.exceptionally(throwable -> {
System.out.println("Failed generating CSV. Error: " + throwable);
return null;
});
}).exceptionally(throwable -> {
System.out.println("Failed to get list of As. Error: " + throwable);
return null;
});
}
推荐阅读
- java - Websocket 客户端?
- java - Mockito / Junit5 org.opentest4j.AssertionFailedError for getter()
- node.js - NodeJs findOneAndUpdate 获取修改的列
- android - Android 版 Google 登录中的问题
- xquery - 如何使用 cts:values/cts:element-attribute-values 过滤条件
- sql - 如何从 PL/SQL 查询(使用 dbms_output)返回值到 JMeters 查看结果树?
- angular - 在 http.get 中获取大数据 (>256 MB)
- excel - 使用初始名称创建单词
- javascript - Unlayer EmailEditor 导入 React 中断应用程序
- sql - 从日期时间 SQL 中提取时间