首页 > 解决方案 > 链式 CompletableFuture 的内存管理

问题描述

我正在编写一个异步客户端。每次发送请求时,客户端都会在内部创建、注册,然后返回一个CompletableFuture,一旦接收到服务器对发送请求的响应,就会完成。

问题是,有些请求永远不会收到响应,并且客户端代码在超时后放弃了它们。因此,我最终CompletableFuture得到了客户端代码不再引用的多个注册 s,因此应该有资格进行垃圾收集。

为了避免内存泄漏,我决定更改客户端的实现以将WeakReferences 维护为返回CompletableFuture的 s 而不是强的,这样如果客户端代码不再需要它们,非引用的期货就会被驱逐。

以下(简化)代码演示了客户端:

interface Message {
    Object getID();
    byte[] toByteArray();
}

abstract class AsyncClient<T extends Message, R extends Message> {
    /**
     * This map serves as a registry of CompletableFutures for sent requests.
     * 
     * Previously it was Map<Object, CompletableFuture<R>>. Changed to References
     * to avoid CompletableFutures staying in registry after the recipient code
     * does not wait for them anymore.
     */
    Map<Object, Reference<CompletableFuture<R>>> requestFutureRegistry = new ConcurrentHashMap<>();

    /**
     * Write the request to the service
     * @param serializedRequest bytes to write
     */
    protected abstract void writeRequest(byte[] serializedRequest);

    /**
     * Make sure to clean up the registry from Futures that arte not held by the 
     * recipient code anymore.
     */
    private void deleteDanglingFutures() {
        requestFutureRegistry.entrySet().removeIf(entry -> entry.getValue().get() == null);
    }

    /**
     * 
     * @param request to be written to service
     * @return a CompletableFuture for the request's response
     */
    public CompletableFuture<R> sendRequest(T request) {
        deleteDanglingFutures();

        CompletableFuture<R> future = new CompletableFuture<>();
        requestFutureRegistry.put(request.getID(), new WeakReference<>(future));

        writeRequest(request.toByteArray());

        return future;
    }

    void receivedResponse(R response) {
        deleteDanglingFutures();

        Reference<CompletableFuture<R>> futureRef = requestFutureRegistry.get(response.getID());
        if (futureRef != null) {
            CompletableFuture<R> future = futureRef.get();
            if (future != null) {
                future.complete(response);
            }
        }
    }
}

这种方法似乎有效,直到我开始CompletableFuture使用 thenXXX 方法链接 s。例如,当我执行以下操作时:

AsyncClient client;
Request request1;
// ...

Response response = client.sendRequest(request1)
                          .thenCompose(response1 -> 
                                       client.sendRequest(request2)
                           ).get(timeout, TimeUnit.MILLISECONDS);

我注意到内部未来(从对 的调用client.sendRequest(request2))有时会在它有机会完成之前得到 GC。

深入挖掘 JDK 实现,似乎没有存储对内部未来的强引用(如本文所述)。

我的问题是 - 如果客户端代码不再引用它们(直接或通过 thenXXX 链接),确保注册的期货被驱逐的正确方法是什么,并且在仍然需要它们的情况下坚持足够长的时间以完成?

编辑+动机

我现在明白(感谢@Holger)这Future::get不是CompletableFutures 的常见用法,但是在我的情况下它很常见。具体来说,我需要支持的一个用例是发送多个请求,等待第一个请求响应(使用anyOf然后get超时)。事先知道许多请求永远不会得到服务的响应,所以我需要(最好是自动)在超时后清理未使用的期货。

标签: javaasynchronousmemory-managementgarbage-collectioncompletable-future

解决方案


推荐阅读