c# - Monitor.Pulse() 有条件
问题描述
我有一个类,应该是线程安全的。我最好希望使用单个同步对象来管理线程安全,以避免复杂的思维导图,因为所有方法都会更改对象状态变量。因此,我在该对象上使用 lock 语句包装方法体。在某些情况下,需要释放锁一段时间才能允许另一个线程更新状态。到目前为止一切顺利,只需使用Monitor.Wait()
and Monitor.Pulse()
。但是,我想有条件地“脉动”。在下面的代码中,我只想向在“Send()”方法中等待的线程发送“Pulse”。同样,仅向在 'Receive()' 方法中等待的线程发送 'Pulse'。
所以总结一下:
- 我更喜欢使用单个同步对象来锁定,因为这四种方法中的每一种都会改变对象状态变量。
- Wait() 应该释放锁,但等待特定的脉冲。收到通知后,应该重新获取锁。
- Pulse() 应该只通知一个等待线程,发送或接收服务员。
- 最好也可以使用 a
CancellationToken
来取消等待。
我尝试了很多东西,包括 Monitor、Semaphore 和 WaitHandle 组合、带有 WaitHandles 的队列以及更多创意选项。另外,我一直在玩多个同步对象。但在每种情况下,我只能让部分功能发挥作用。
下面的代码是我得到的最接近的。TODO 注释显示代码有什么问题。
public class Socket
{
public class Item { }
private object sync = new object();
private ManualResetEvent receiveAvailable = new ManualResetEvent(false);
private Queue<Item> receiveQueue = new Queue<Item>();
// used by client, from any thread
public void Send(Item item, CancellationToken token)
{
lock (this.sync)
{
// sends the message somewhere and should await confirmation.
// note that the confirmation order matters.
// TODO: Should only continue on notification from 'NotifySent()', and respect the cancellation token
Monitor.Wait(this.sync);
}
}
// used by client, from any thread
public Item Receive(CancellationToken token)
{
lock (this.sync)
{
if (!this.receiveAvailable.WaitOne(0))
{
// TODO: Should only be notified by 'EnqueueReceived()' method, and respect the cancellation token.
Monitor.Wait(this.sync);
}
var item = this.receiveQueue.Dequeue();
if (this.receiveQueue.Count == 0)
{
this.receiveAvailable.Reset();
}
return item;
}
}
// used by internal worker thread
internal void NotifySent()
{
lock (this.sync)
{
// Should only notify the Send() method.
Monitor.Pulse(this.sync);
}
}
// used by internal worker thread
internal void EnqueueReceived(Item item)
{
lock (this.sync)
{
this.receiveQueue.Enqueue(item);
this.receiveAvailable.Set();
// TODO: Should only notify the 'Receive()' method.
Monitor.Pulse(this.sync);
}
}
}
旁注:在python中,我的要求可以使用a threading.Condition
(忽略CancellationToken
)。在 C# 中可能有类似的构造?
class Socket(object):
def __init__(self):
self.sync = threading.RLock()
self.receive_queue = collections.deque()
self.send_ready = threading.Condition(self.sync)
self.receive_ready = threading.Condition(self.sync)
def send(self, item):
with self.send_ready:
// send the message
self.send_ready.wait()
def receive(self):
with self.receive_ready:
try:
return self.receive_queue.popleft()
except IndexError:
self.receive_ready.wait()
return self.receive_queue.popleft()
def notify_sent(self):
with self.sync:
self.send_ready.notify()
def enqueue_received(self, item):
with self.sync:
self.receive_queue.append(item)
self.receive_ready.notify()
解决方案
您正在寻找的是条件变量,它不会直接在任何 .NET API 中公开。这Monitor
是与您正在寻找的最接近的内置类型,它是与单个条件变量相结合的互斥锁。
在 .NET 中解决此问题的标准方法是在继续之前始终重新检查条件(在等待端)。这也是处理虚假唤醒所必需的,这可能发生在所有基于条件变量的解决方案中。
因此:
// Note: 'while', not 'if'
while (!this.receiveAvailable.WaitOne(0))
{
Monitor.Wait(this.sync);
}
等等。
在 .NET 中,由于没有条件变量,因此与指定条件相比,虚假唤醒会更多,但即使在指定条件场景中,也可能发生虚假唤醒。
推荐阅读
- tensorflow - 如何使用 tensorflow 联合进行数据扩充?
- javascript - 如何将 UnityEngine.UI.Button 转换为字符串
- java - 将 for 循环转换为 hashmap 打印的方法签名
- php - 如何从 Laravel API 中的集合中返回单个对象
- java - 如何在springboot中返回一个json结果
- javascript - 使用 Leaflet 计算分数缩放
- python - 可散列、可调用数据类的 Python 类型提示
- javascript - 检测过滤后是否更改列表 - Javascript
- android - 如何计算往返行程和运行测试是否需要 3.5mm 适配器
- mysql - MySQL选择自上一次出现一个月的某一天以来的所有行