首页 > 解决方案 > Monitor.Pulse() 有条件

问题描述

我有一个类,应该是线程安全的。我最好希望使用单个同步对象来管理线程安全,以避免复杂的思维导图,因为所有方法都会更改对象状态变量。因此,我在该对象上使用 lock 语句包装方法体。在某些情况下,需要释放锁一段时间才能允许另一个线程更新状态。到目前为止一切顺利,只需使用Monitor.Wait()and Monitor.Pulse()。但是,我想有条件地“脉动”。在下面的代码中,我只想向在“Send()”方法中等待的线程发送“Pulse”。同样,仅向在 'Receive()' 方法中等待的线程发送 'Pulse'。

所以总结一下:

我尝试了很多东西,包括 Monitor、Semaphore 和 WaitHandle 组合、带有 WaitHandles 的队列以及更多创意选项。另外,我一直在玩多个同步对象。但在每种情况下,我只能让部分功能发挥作用。

下面的代码是我得到的最接近的。TODO 注释显示代码有什么问题。

public class Socket
{
    public class Item { }

    private object sync = new object();
    private ManualResetEvent receiveAvailable = new ManualResetEvent(false);

    private Queue<Item> receiveQueue = new Queue<Item>();

    // used by client, from any thread
    public void Send(Item item, CancellationToken token)
    {
        lock (this.sync)
        {
            // sends the message somewhere and should await confirmation.
            // note that the confirmation order matters.

            // TODO: Should only continue on notification from 'NotifySent()', and respect the cancellation token
            Monitor.Wait(this.sync); 
        }
    }

    // used by client, from any thread
    public Item Receive(CancellationToken token)
    {
        lock (this.sync)
        {
            if (!this.receiveAvailable.WaitOne(0))
            {
                // TODO: Should only be notified by 'EnqueueReceived()' method, and respect the cancellation token.
                Monitor.Wait(this.sync);
            }

            var item = this.receiveQueue.Dequeue();
            if (this.receiveQueue.Count == 0)
            {
                this.receiveAvailable.Reset();
            }

            return item;
        }
    }

    // used by internal worker thread
    internal void NotifySent()
    {
        lock (this.sync)
        {
            // Should only notify the Send() method.
            Monitor.Pulse(this.sync);
        }
    }

    // used by internal worker thread
    internal void EnqueueReceived(Item item)
    {
        lock (this.sync)
        {
            this.receiveQueue.Enqueue(item);
            this.receiveAvailable.Set();

            // TODO: Should only notify the 'Receive()' method.
            Monitor.Pulse(this.sync);
        }
    }
}

旁注:在python中,我的要求可以使用a threading.Condition(忽略CancellationToken)。在 C# 中可能有类似的构造?

class Socket(object):
    def __init__(self):
        self.sync = threading.RLock()
        self.receive_queue = collections.deque()
        self.send_ready = threading.Condition(self.sync)
        self.receive_ready = threading.Condition(self.sync)

    def send(self, item):
        with self.send_ready:
            // send the message
            self.send_ready.wait()

    def receive(self):
        with self.receive_ready:
            try:
                return self.receive_queue.popleft()
            except IndexError:
                self.receive_ready.wait()
            return self.receive_queue.popleft()

    def notify_sent(self):
        with self.sync:
            self.send_ready.notify()

    def enqueue_received(self, item):
        with self.sync:
            self.receive_queue.append(item)
            self.receive_ready.notify()

标签: c#multithreading

解决方案


您正在寻找的是条件变量,它不会直接在任何 .NET API 中公开。这Monitor是与您正在寻找的最接近的内置类型,它是与单个条件变量相结合的互斥锁。

在 .NET 中解决此问题的标准方法是在继续之前始终重新检查条件(在等待端)。这也是处理虚假唤醒所必需的,这可能发生在所有基于条件变量的解决方案中

因此:

// Note: 'while', not 'if'
while (!this.receiveAvailable.WaitOne(0))
{
  Monitor.Wait(this.sync);
}

等等。

在 .NET 中,由于没有条件变量,因此与指定条件相比,虚假唤醒会更多,但即使在指定条件场景中,也可能发生虚假唤醒。


推荐阅读