首页 > 解决方案 > 使用 CompletableFuture 和线程池处理大量任务

问题描述

我有一个场景,我需要使用阻塞 API 向服务器发送 1M 消息。API 不接受批处理请求,所以我必须一一发送 1M 消息。

我没有使用一个线程,而是考虑使用多个线程来发送它们。

调用者必须等待所有 1M 消息发送完毕才能继续。

我的实现如下:

public class MySender {
    private final MyPublisher myPublisher;
    private final ExecutorService threadPool;
    private final Map<String, List<CompletableFuture<Void>>> jobMap = Maps.newConcurrentMap();

    public MySender (final MyPublisher myPublisher,
                     ExecutorService threadPool) {
        this.myPublisher= myPublisher;
        this.threadPool = threadPool;
    }

    public void send(final MyData event) {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> doSubmit(event), threadPool);
        List<CompletableFuture<Void>> futureList = jobMap.computeIfAbsent(event.getID(), entry -> new ArrayList<>());
        futureList.add(future);
    }

    public void notifySendComplete(final String id) {
        if(!jobMap.containsKey(id)) {
            return;
        }

        jobMap.get(id).forEach(CompletableFuture::join);
        jobMap.remove(id);
    }

    private void doSubmit(final MyData event) {
         try {
             ....
             myPublisher.send(event);
             ....
         } catch(Exception e) {
             // log error
         }
    }
}

客户端类可以这样简单地使用这个类:

myInputList.forEach(input -> {
    MyData event = createData(input);
    mySender.send(event);
})

mySender.notifySendComplete();

我认为这个实现会起作用,但问题很明显。它需要在 map 中持有 1M CompletableFuture,不符合垃圾回收条件。

这是一个大问题吗?如果是这样,有没有更好的方法?

限制:

  1. 线程池无法关闭
  2. 我可以使用 CountDownLatch 实现它,但不允许在我的项目中使用它。

标签: javamultithreading

解决方案


您可能不仅要多线程您的解决方案,而且要使用 Bulkhead 来限制等待请求的数量,因为如果目前没有限制,如果您用 1M 命中服务所有者,他们将为您设置配额要求。看看 Histryx Bulkhead,它为您管理线程池,您可以调整最大并发线程Hystrix 使用的 Bulkhead Pattern 是什么? 另外,正如其他人提到的,您从内存中的 1M 记录开始,这些记录来自哪里?如果来自数据库,您可能需要考虑使用 R2DB 驱动程序来使用可以在加载消息时处理消息的反应流,而不是将所有消息加载到内存然后处理。见https://www.baeldung.com/java-reactive-systems


推荐阅读