首页 > 解决方案 > 尝试测试 FIFO 互斥锁 - 如果我在一个循环中启动测试线程,它不起作用,但如果我间隔 1 毫秒启动它们,它确实有效

问题描述

我一直在使用这个答案中的排队锁定代码,并为它编写了一个单元测试。供参考,锁码:

public sealed class FifoMutex
{
    private readonly object innerLock = new object();
    private volatile int ticketsCount = 0;
    private volatile int ticketToRide = 1;
    private readonly ThreadLocal<int> reenter = new ThreadLocal<int>();

    public void Enter()
    {
        reenter.Value++;
        if (reenter.Value > 1)
            return;
        int myTicket = Interlocked.Increment(ref ticketsCount);
        Monitor.Enter(innerLock);
        while (true)
        {
            if (myTicket == ticketToRide)
            {
                return;
            }
            else
            {
                Monitor.Wait(innerLock);
            }
        }
    }

    public void Exit()
    {
        if (reenter.Value > 0)
            reenter.Value--;
        if (reenter.Value > 0)
            return;
        Interlocked.Increment(ref ticketToRide);
        Monitor.PulseAll(innerLock);
        Monitor.Exit(innerLock);
    }
}

还有我的测试代码:

[TestClass]
public class FifoMutexTests
{
    public static ConcurrentQueue<string> Queue;

    [TestInitialize]
    public void Setup()
    {
        Queue = new ConcurrentQueue<string>();
    }

    [TestCleanup]
    public void TearDown()
    {
        Queue = null;
    }

    [TestMethod]
    public void TestFifoMutex()
    {
        int noOfThreads = 10;
        int[] threadSleepTimes = new int[noOfThreads];
        string[] threadNames = new string[noOfThreads];
        Random r = new Random();
        for (int i = 0; i < noOfThreads; i++)
        {
            threadSleepTimes[i] = r.Next(0, 250);
            threadNames[i] = "Thread " + i;
        }

        for (int i = 0; i < noOfThreads; i++)
        {
            FifoMutexTestUser user = new FifoMutexTestUser();
            Thread newThread = new Thread(user.DoWork);
            newThread.Name = threadNames[i];
            newThread.Start(threadSleepTimes[i]);
        }
        Thread.Sleep(3000);

        var receivedThreadNamesInOrder = Queue.ToArray();
        Assert.AreEqual(threadNames.Length, receivedThreadNamesInOrder.Length);
        for (int i = 0; i < receivedThreadNamesInOrder.Length; i++)
        {
            Assert.AreEqual(threadNames[i], receivedThreadNamesInOrder[i]);
        }
    }
}

使用这个测试互斥用户:

public class FifoMutexTestUser
{
    private readonly static FifoMutex fifoMutex = new FifoMutex();

    public void DoWork(object sleepTime)
    {
        try
        {
            fifoMutex.Enter();
            Thread.Sleep((int)sleepTime);
            FifoMutexTests.Queue.Enqueue(Thread.CurrentThread.Name);
        }
        finally
        {
            fifoMutex.Exit();
        }
    }
}

本质上,我正在创建十个线程,每个线程都会随机休眠一段时间,然后将它们的名称排入主测试类的静态并发队列中。线程是从同一用户类的不同实例构建的,该类具有静态 fifo 互斥属性。该场景类似于我自己的用例(我有多个消费者类从不同的地方接收消息,我需要我的后端严格按顺序处理它们,但也严格按照它们到达的顺序)。

但是这个测试不起作用。所有线程都将其所有名称排入队列,但顺序不正确。从第二个片段的最后一个 for 循环中,我读到它们实际上是按随机顺序执行的,这正是 fifo 互斥锁要防止的。

但事情就是这样。对我的测试代码进行了一次小的调整,这一切都像一个魅力。

        for (int i = 0; i < noOfThreads; i++)
        {
            FifoMutexTestUser user = new FifoMutexTestUser();
            Thread newThread = new Thread(user.DoWork);
            Thread.Sleep(1);
            newThread.Name = threadNames[i];
            newThread.Start(threadSleepTimes[i]);
        }

现在我在启动所有线程的循环(第二个片段的第二个循环)中睡了一毫秒,这是可能的最小间隔。如果我这样做,那么我所有的线程都以正确的顺序将它们的名称排入队列,并且我的测试 100% 的时间成功。

所以我想知道为什么这么短的睡眠时间会有所作为。我对编译不是很了解,但我的第一个猜测是编译器正在编译或优化启动所有线程的循环,并且在该过程中线程的顺序发生了变化?

或者,(可能更可能的)替代方案是我的测试代码(或互斥代码)有问题吗?

标签: c#unit-testingconcurrencymutexvs-unit-testing-framework

解决方案


似乎(如果我正确理解问题)您假设线程将实际启动(在这种情况下将执行)并以您调用它们DoWork的相同顺序获取互斥锁。Thread.Start然而,这不是(必要的)情况。

假设您有 10 个线程(“id”从 1 到 10),然后Thead.Start按顺序调用它们 - 这并不意味着它们实际上会按该顺序启动。您在线程 1 上调用 start,然后在线程 2 上调用 start,然后DoWork线程 2(不是 1)可能首先执行。您可以通过以下方式更改测试代码来观察这一点:

public class FifoMutexTestUser {
    private readonly int _id;
    public FifoMutexTestUser(int id) {
        _id = id;
    }
    private readonly static FifoMutex fifoMutex = new FifoMutex();

    public void DoWork(object sleepTime)
    {
        Console.WriteLine("Thread started: " + _id);
        try
        {
            fifoMutex.Enter();
            Thread.Sleep((int)sleepTime);
            FifoMutexTests.Queue.Enqueue(Thread.CurrentThread.Name);
        }
        finally
        {
            fifoMutex.Exit();
        }
    }
}

然后在那里传递循环变量(与您threadNames执行断言相关):

for (int i = 0; i < noOfThreads; i++)
{
    FifoMutexTestUser user = new FifoMutexTestUser(i);
    Thread newThread = new Thread(user.DoWork);
    newThread.Name = threadNames[i];
    newThread.Start(threadSleepTimes[i]);
}

你可以看到这样的东西(结果当然可能会有所不同):

Thread started: 9
Thread started: 1
Thread started: 0
Thread started: 2
Thread started: 3
Thread started: 4
Thread started: 5
Thread started: 6
Thread started: 7
Thread started: 8

因此,在这次运行中,您上次调用的线程Thread.Start实际上是首先启动的。但更重要的是 - 如果线程首先启动(通过启动我们的意思是这里DoWork开始执行) - 这并不意味着它会首先获取互斥锁,因为线程并行执行并且代码在和之外fifoMutex.EnterfifoMutex.Exit以及在互斥锁之前和之后的那些函数内部)实际上被获取\释放)不受任何同步结构的保护 - 任何线程都可以先获取互斥锁。

有时(并非总是)添加延迟会给先前(循环中)线程带来优势,因此它有更多机会首先实际获取互斥锁。如果您很幸运,线程试图以正确的顺序获取互斥锁,那么您FifoMutex可以确保它们将按该顺序解除阻塞。但是他们获取互斥锁的顺序在你的测试代码中是不确定的。


推荐阅读