java - Java(Scala)阻塞队列可以允许队列跳线(更高优先级)吗?
问题描述
可以LinkedBlockingQueue
用来阻止操作。假设我在队列中只有 1 个元素,并且每个人都可以使用它。
val q = new LinkedBlockingQueue() // 1 element in it
def fun() = {
val instance = q.take()
// do some operations
}
def foo() = {
val instance = q.take()
// do some operations
}
// Use 3 threads to run following 3 methods, and the order they call q.take() is following
fun()
fun() // will wait for first fun()
foo() // will wait for second fun()
这些方法完成的顺序是fun(), fun(), foo()
但是,现在我想设置foo
更高的优先级,这意味着允许它成为队列跳跃者。可以在foo
第二个之前采取实例fun()
(当第二个fun
正在等待时,foo
跳到它的前面)
他们完成的顺序可能变成fun(), foo(), fun()
, (第一个fun
会接受实例,因为实例可用,第二个应该等待,然后foo
也等待,但跳到了 second 的前面fun
)
有可能吗?或者是否有任何其他可能的数据结构
解决方案
我不知道任何用于此任务的内置工具,但实施起来不会太难。由于您只想交换单个元素,因此不需要队列而是交换器。
一个简单的实现可能看起来像
class SingleElementExchanger<T> {
int priorityConsumer;
T value;
public synchronized void set(T newValue) throws InterruptedException {
Objects.requireNonNull(newValue);
while(value != null) wait();
value = newValue;
notifyAll();
}
public synchronized T ordinaryGet() throws InterruptedException {
while(priorityConsumer != 0 || value == null) wait();
T received = value;
value = null;
notifyAll();
return received;
}
public synchronized T priorityGet() throws InterruptedException {
priorityConsumer++;
try {
while(value == null) wait();
T received = value;
value = null;
notifyAll();
return received;
}
finally {
priorityConsumer--;
}
}
}
对于您的两个普通消费者和一个优先消费者和少数生产者来说,这可能已经足够了。
对于大量线程,您可能希望使用Lock
, 来通知正确的一方,而不是使用notifyAll()
.
class SingleElementExchanger<T> {
final Lock lock = new ReentrantLock();
final Condition empty = lock.newCondition(),
fullNoPri = lock.newCondition(), fullPri = lock.newCondition();
int priorityConsumer;
T value;
public void set(T newValue) throws InterruptedException {
Objects.requireNonNull(newValue);
lock.lock();
try {
while(value != null) empty.await();
value = newValue;
(priorityConsumer==0? fullNoPri: fullPri).signal();
}
finally {
lock.unlock();
}
}
public T ordinaryGet() throws InterruptedException {
lock.lock();
try {
while(priorityConsumer != 0 || value == null) fullNoPri.await();
T received = value;
value = null;
empty.signal();
return received;
}
finally {
lock.unlock();
}
}
public T priorityGet() throws InterruptedException {
lock.lock();
try {
priorityConsumer++;
while(value == null) fullPri.await();
T received = value;
value = null;
empty.signal();
return received;
}
finally {
priorityConsumer--;
lock.unlock();
}
}
}
推荐阅读
- java - 从 recyclerview 适配器开始新活动
- amazon-web-services - Dynamodb - 使用 GraphQL 扫描地图属性
- php - Laravel插入正常图像,值变为空
- arrays - React过滤Json数组//TypeError:includes is not a function
- python - 从 python 字典中的多个键中删除重复的值项并附加到新字典
- sql - 使用 AND 运算符的 Oracle 更新问题
- javascript - 将阅读更多按钮添加到字符串 html 反应
- angular - 无法在角度指令中删除事件监听器
- javascript - 如何在Angular中传递基于接口的属性
- java - 没有中间变量,泛型代码无法编译