java - 使用 CompletableFuture 和线程池处理大量任务
问题描述
我有一个场景,我需要使用阻塞 API 向服务器发送 1M 消息。API 不接受批处理请求,所以我必须一一发送 1M 消息。
我没有使用一个线程,而是考虑使用多个线程来发送它们。
调用者必须等待所有 1M 消息发送完毕才能继续。
我的实现如下:
public class MySender {
private final MyPublisher myPublisher;
private final ExecutorService threadPool;
private final Map<String, List<CompletableFuture<Void>>> jobMap = Maps.newConcurrentMap();
public MySender (final MyPublisher myPublisher,
ExecutorService threadPool) {
this.myPublisher= myPublisher;
this.threadPool = threadPool;
}
public void send(final MyData event) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> doSubmit(event), threadPool);
List<CompletableFuture<Void>> futureList = jobMap.computeIfAbsent(event.getID(), entry -> new ArrayList<>());
futureList.add(future);
}
public void notifySendComplete(final String id) {
if(!jobMap.containsKey(id)) {
return;
}
jobMap.get(id).forEach(CompletableFuture::join);
jobMap.remove(id);
}
private void doSubmit(final MyData event) {
try {
....
myPublisher.send(event);
....
} catch(Exception e) {
// log error
}
}
}
客户端类可以这样简单地使用这个类:
myInputList.forEach(input -> {
MyData event = createData(input);
mySender.send(event);
})
mySender.notifySendComplete();
我认为这个实现会起作用,但问题很明显。它需要在 map 中持有 1M CompletableFuture,不符合垃圾回收条件。
这是一个大问题吗?如果是这样,有没有更好的方法?
限制:
- 线程池无法关闭
- 我可以使用 CountDownLatch 实现它,但不允许在我的项目中使用它。
解决方案
您可能不仅要多线程您的解决方案,而且要使用 Bulkhead 来限制等待请求的数量,因为如果目前没有限制,如果您用 1M 命中服务所有者,他们将为您设置配额要求。看看 Histryx Bulkhead,它为您管理线程池,您可以调整最大并发线程Hystrix 使用的 Bulkhead Pattern 是什么? 另外,正如其他人提到的,您从内存中的 1M 记录开始,这些记录来自哪里?如果来自数据库,您可能需要考虑使用 R2DB 驱动程序来使用可以在加载消息时处理消息的反应流,而不是将所有消息加载到内存然后处理。见https://www.baeldung.com/java-reactive-systems
推荐阅读
- c# - 为动态 exec 存储过程构造参数列表的任何更简洁的方法?
- firebase - Flutter with Firebase JWT 向 Heroku Hasura 发送 GraphQL (graphql_flutter) 请求,该请求具有“格式错误的授权标头”
- powershell - Powershell密码解密
- python - 在 pandas DataFrame 中按季节对月份的列进行子集
- php - PHP intl 扩展在启用后未加载
- javascript - 是否可以从通过 video.js 开始的视频请求中查看响应标头?
- docker - Bitbucket 管道:Docker 登录到私有 ECR 成功,但拉取失败
- python - 如何在树中插入新块?
- angular - nb-checkbox 被选中/true 全部而不是选中一个
- c - 如何锁定文件以使其他进程无法捕获它?