首页 > 解决方案 > 同步并发请求以共享慢操作的结果

问题描述

我有一个 Java UI 服务,它有一个 API 方法,该方法调用一个相对较慢的操作(比如 ~30 秒)。该操作是无参数的,但它对随时间变化(相对缓慢)的外部数据进行操作。该方法返回最新结果并不重要——如果它们是 30 秒旧的,这是可以接受的。

最终我需要优化慢操作的实现,但作为一个短期修复,我想让操作互斥,这样如果第二个传入请求(在单独的线程上)尝试调用操作,而另一个已经在进行中,然后第二个阻塞直到第一个完成。然后第二个线程使用第一次调用操作的结果——即它不会尝试再次运行该操作。

例如:

class MyService {
    String serviceApiMmethod() {
       // If a second thread attempts to call this method while another is in progress
       // then block here until the first returns and then use those results
       // (allowing it to return immediately without a second call to callSlowOperation).
       return callSlowOperation();
    }
}

Java(8)中首选的通用方法是什么。我猜我可以使用 CountDownLatch,但目前尚不清楚如何最好地跨线程共享结果。是否存在促进这一点的现有并发原语?

编辑:一旦所有线程都使用了结果(即返回给调用者),我需要清除对结果的任何引用,因为它是相对较大的对象,需要尽快进行 GC。

标签: javaconcurrencycountdownlatch

解决方案


简单的想法

版本 1:

class Foo {
    public String foo() throws Exception {
        synchronized (this) {
            if (counter.incrementAndGet() == 1) {
                future = CompletableFuture.supplyAsync(() -> {
                    try {
                        Thread.sleep(1000 * (ThreadLocalRandom.current().nextInt(3) + 1));
                    } catch (InterruptedException e) {
                    }
                    return "ok" + ThreadLocalRandom.current().nextInt();
                });
            }
        }

        String result = future.get();
        if (counter.decrementAndGet() == 0) {
            future = null;
        }

        return result;
    }

    private AtomicInteger counter = new AtomicInteger();
    private Future<String> future;
}

版本 2:与@AleksandrSemyannikov 一起

public class MyService {
    private AtomicInteger counter = new AtomicInteger();
    private volatile String result;

    public String serviceApiMethod() {
        counter.incrementAndGet();
        try {
            synchronized (this) {
                if (result == null) {
                    result = callSlowOperation();
                }
            }
            return result;
        } finally {
            if (counter.decrementAndGet() == 0) {
                synchronized (this) {
                    if (counter.get() == 0) {
                        result = null;
                    }
                }
            }
        }
    }

    private String callSlowOperation() {
        try {
            Thread.sleep(ThreadLocalRandom.current().nextInt(1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return Thread.currentThread().getName();
    }
}

推荐阅读