首页 > 解决方案 > Java(Scala)阻塞队列可以允许队列跳线(更高优先级)吗?

问题描述

可以LinkedBlockingQueue用来阻止操作。假设我在队列中只有 1 个元素,并且每个人都可以使用它。

val q = new LinkedBlockingQueue() // 1 element in it
def fun() = {
  val instance = q.take()
  // do some operations
}
def foo() = {
  val instance = q.take()
  // do some operations
}
// Use 3 threads to run following 3 methods, and the order they call q.take() is following
fun()
fun() // will wait for first fun()
foo() // will wait for second fun()

这些方法完成的顺序是fun(), fun(), foo()

但是,现在我想设置foo更高的优先级,这意味着允许它成为队列跳跃者。可以在foo第二个之前采取实例fun()(当第二个fun正在等待时,foo跳到它的前面)

他们完成的顺序可能变成fun(), foo(), fun(), (第一个fun会接受实例,因为实例可用,第二个应该等待,然后foo也等待,但跳到了 second 的前面fun

有可能吗?或者是否有任何其他可能的数据结构

标签: javascalaconcurrencyblocking

解决方案


我不知道任何用于此任务的内置工具,但实施起来不会太难。由于您只想交换单个元素,因此不需要队列而是交换器。

一个简单的实现可能看起来像

class SingleElementExchanger<T> {
    int priorityConsumer;
    T value;

    public synchronized void set(T newValue) throws InterruptedException {
        Objects.requireNonNull(newValue);
        while(value != null) wait();
        value = newValue;
        notifyAll();
    }

    public synchronized T ordinaryGet() throws InterruptedException {
        while(priorityConsumer != 0 || value == null) wait();
        T received = value;
        value = null;
        notifyAll();
        return received;
    }

    public synchronized T priorityGet() throws InterruptedException {
        priorityConsumer++;
        try {
            while(value == null) wait();
            T received = value;
            value = null;
            notifyAll();
            return received;
        }
        finally {
            priorityConsumer--;
        }
    }
}

对于您的两个普通消费者和一个优先消费者和少数生产者来说,这可能已经足够了。

对于大量线程,您可能希望使用Lock, 来通知正确的一方,而不是使用notifyAll().

class SingleElementExchanger<T> {
    final Lock lock = new ReentrantLock();
    final Condition empty = lock.newCondition(),
        fullNoPri = lock.newCondition(), fullPri = lock.newCondition();

    int priorityConsumer;
    T value;

    public void set(T newValue) throws InterruptedException {
        Objects.requireNonNull(newValue);
        lock.lock();
        try {
            while(value != null) empty.await();
            value = newValue;
            (priorityConsumer==0? fullNoPri: fullPri).signal();
        }
        finally {
            lock.unlock();
        }
    }

    public T ordinaryGet() throws InterruptedException {
        lock.lock();
        try {
            while(priorityConsumer != 0 || value == null) fullNoPri.await();
            T received = value;
            value = null;
            empty.signal();
            return received;
        }
        finally {
            lock.unlock();
        }
    }

    public T priorityGet() throws InterruptedException {
        lock.lock();
        try {
            priorityConsumer++;
            while(value == null) fullPri.await();
            T received = value;
            value = null;
            empty.signal();
            return received;
        }
        finally {
            priorityConsumer--;
            lock.unlock();
        }
    }
}

推荐阅读