首页 > 解决方案 > Java中的信号量和调度程序竞争条件

问题描述

我写了一个简单的速率限制器来限制远程服务的使用:

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

class SimpleRateLimiter {
    private Semaphore semaphore;
    private int maxPermits;
    private TimeUnit timePeriod;
    private ScheduledExecutorService scheduler;

    public static SimpleRateLimiter create(int permits, TimeUnit timePeriod) {
        SimpleRateLimiter limiter = new SimpleRateLimiter(permits, timePeriod);
        limiter.schedulePermitReplenishment();
        return limiter;
    }

    private SimpleRateLimiter(int permits, TimeUnit timePeriod) {
        this.semaphore = new Semaphore(permits);
        this.maxPermits = permits;
        this.timePeriod = timePeriod;
    }

    public boolean tryAcquire() {
        return semaphore.tryAcquire();
    }

    public void blockAcquire() throws InterruptedException {
        semaphore.acquire();
    }

    public void stop() {
        scheduler.shutdownNow();
        semaphore.drainPermits();
    }

    public int getPermitCount() {
        return semaphore.availablePermits();
    }

    public void schedulePermitReplenishment() {
        scheduler = Executors.newScheduledThreadPool(1);
        scheduler.scheduleWithFixedDelay(() -> {
            semaphore.release(maxPermits - semaphore.availablePermits());
        }, 0, 1, timePeriod);

    }
}

要使用它,我有以下内容:

SimpleRateLimiter rateLimiter = SimpleRateLimiter.create(100, TimeUnit.SECONDS);
...
//In some thread loop:
if (rateLimiter.tryAcquire()) {
    System.out.println("Permit left: " + rateLimiter.getPermitCount());
...
}

一切都很好,直到有一天它停止工作。检查日志我发现 rateLimiter.getPermitCount() 达到 104,这(我怀疑)maxPermits - semaphore.availablePermits()会变成负数并抛出异常,导致内部的单线程调度程序schedulePermitReplenishment()停止工作。我的问题是,在什么情况下 getPermitCount() 可以超过 100,因为信号量只能在内部访问并且只能由单个线程访问?

谢谢

标签: javamultithreading

解决方案


您的代码存在一些问题。但一个问题是 schedulePermitReplenishment 方法中的潜在竞争条件。

如果这将被同时调用并且您将拥有例如 9 个可用许可和 10 个最大许可,那么您最终可能会得到以下结果:

信号量.release(10 - 9);

这与 sempahore.release(1) 相同。因此,如果 2 个线程同时调用它,您最终会得到 11 个许可,而不是 10 个。

正如您已经指出的那样,下次调用 schedulePermitReplenishment 时,您最终将获得一个 10-11=-1 的许可,该许可将被释放,并且您最终会遇到一个异常(这确实被执行程序吞没了)。

为什么需要补充许可证?门票会不会丢失?因为这可能是可用票数大于“最大”票数的另一个来源。例如,如果一张票被拿走,你会打电话给补货,但票最终被退回,你也会得到太多票。

我不确定如何使用这段代码,但你也可以在 executor 字段上进行数据竞争。也许最好在构造函数中创建执行器而不是替换它。


推荐阅读