首页 > 解决方案 > 如何设计一个具有互斥但独立的并发方法的任务队列?

问题描述

我在一次工作面试中被问到一个并发问题,最终归结为以下要求。我能够通过使用互斥来实现#2 到#4,但不是#1。

使用以下方法设计任务队列:

public void registerCallback(Runnable task)

public void eventFired()

  1. 多个线程应该能够将任务放入队列中,可能是并发的。
  2. eventFired应该只调用一次。
  3. 如果eventFired先前已调用过,则以后对任一方法的任何调用都应引发异常。
  4. 如果在执行eventFired时被调用,registerCallback则延迟触发事件直到稍后时间。
  5. 如果在执行registerCallback时被调用,eventFired则抛出异常。

ReentrantReadWriteLock看起来很有希望,因为registerCallback可以获得读锁和eventFired写锁,但这并不能解决registerCallback调用的竞争条件,然后eventFired.

有任何想法吗?

标签: javamultithreadingconcurrencyjava.util.concurrent

解决方案


ReentrantReadWriteLock看起来很有希望,因为registerCallback可以获得读锁和eventFired写锁,但这并不能解决registerCallback调用的竞争条件,然后eventFired.

注册回调,换句话说,修改数据结构,同时持有读锁,绝不是一个好主意。您需要另一个线程安全的构造来存储回调并不必要地使代码复杂化。

注册回调的操作,例如将引用存储到某个集合或实例化任何类型的节点对象,非常简单,可以使用普通的互斥锁,因为它不会长时间持有。

无论您使用synchronized、 a还是支持原子更新的状态,在任何一种情况下,都不存在竞争条件,因为orLock不能重叠。如果使用得当,所有这些方法都会为这些操作带来顺序。所以每一个都是,在第一个之前或之后。registerCallbackeventFiredregisterCallbackeventFired

实现这一点可以很简单:

public class JobQueue {
    private List<Runnable> callbacks = new ArrayList<>();

    public synchronized void registerCallback(Runnable task) {
        if(callbacks == null) {
            throw new IllegalStateException("Event already fired");
        }
        callbacks.add(Objects.requireNonNull(task));
    }
    public void eventFired() {
        List<Runnable> list;
        synchronized(this) {
            list = callbacks;
            callbacks = null;
        }
        if(list == null) {
            throw new IllegalStateException("Can only fire once");
        }
        for(Runnable r: list) r.run();
    }
}

块内执行的代码synchronized非常短,以至于对于大多数实际用例来说,争用都无关紧要。用 a 实现同样的Lock操作很简单,但不会有任何优势。事实上,JVM 特定的优化可能会使synchronized基于解决方案的效率更高。

为了完整起见,这里有一个基于原子更新的解决方案:

public class JobQueue {
    private static final Runnable DONE = () -> {};

    private final AtomicReference<Runnable> pending = new AtomicReference<>();

    public void registerCallback(Runnable task) {
        Objects.requireNonNull(task);
        for(;;) {
            Runnable previous = pending.get();
            if(previous == DONE) throw new IllegalStateException("Event already fired");
            if(pending.compareAndSet(previous, sequence(previous, task))) return;
        }
    }

    public void eventFired() {
        Runnable previous = pending.getAndSet(DONE);
        if(previous == DONE) throw new IllegalStateException("Can only fire once");
        if(previous != null) previous.run();
    }

    static Runnable sequence(Runnable a, Runnable b) {
        return a == null? b: () -> { a.run(); b.run(); };
    }
}

实际上,多个registerCallback和/或eventFired调用的执行可能会重叠,但在这种情况下,只有一个可以成功执行关键原子更新。这为操作带来了一个顺序,最多使一次eventFired调用成功,并将所有registerCallback调用分类为在此之前之后进行的。


推荐阅读