首页 > 解决方案 > 准确模拟消息延迟

问题描述

我编写了一个简单的“延迟模拟器”,它可以工作,但有时消息的延迟时间超过了指定的时间。我需要帮助以确保消息延迟正确的时间。

我相信主要问题是我正在使用Thread.Sleep(x),这取决于各种因素,但主要取决于时钟中断率,这导致Thread.Sleep()分辨率约为 15ms。此外,密集型任务将需要更多的 CPU 时间,并且有时会导致延迟大于请求的延迟。如果您不熟悉 的解决问题Thread.Sleep,可以阅读这些 SO 帖子:此处此处此处

这是我的LatencySimulator

public class LatencySimulatorResult: EventArgs
{
    public int messageNumber { get; set; }
    public byte[] message { get; set; }
}

public class LatencySimulator
{
    private int messageNumber;
    private int latency = 0;
    private int processedMessageCount = 0;

    public event EventHandler messageReady;

    public void Delay(byte[] message, int delay)
    {
        latency = delay;

        var result = new LatencySimulatorResult();
        result.message = message;
        result.messageNumber = messageNumber;

        if (latency == 0)
        {
            if (messageReady != null)
                messageReady(this, result);
        }
        else
        {
            ThreadPool.QueueUserWorkItem(ThreadPoolCallback, result);
        }
        Interlocked.Increment(ref messageNumber);
    }

    private void ThreadPoolCallback(object threadContext)
    {
        Thread.Sleep(latency);
        var next = (LatencySimulatorResult)threadContext;

        var ready = next.messageNumber == processedMessageCount + 1;
        while (ready == false)
        {
            ready = next.messageNumber == processedMessageCount + 1;
        }

        if (messageReady != null)
            messageReady(this, next);

        Interlocked.Increment(ref processedMessageCount);
    }
}

要使用它,您需要创建一个新实例并绑定到事件处理程序:

var latencySimulator = new LatencySimulator();
latencySimulator.messageReady += MessageReady;

然后,您调用latencySimulator.Delay(someBytes, someDelay); 当消息完成延迟时,将触发该事件,然后您可以处理延迟的消息。

保持添加消息的顺序很重要。我不能让它们以某种随机顺序从延迟模拟器的另一端出来。

这是一个使用延迟模拟器并查看消息延迟多长时间的测试程序:

private static LatencySimulator latencySimulator;
private static ConcurrentDictionary<int, PendingMessage> pendingMessages;
private static List<long> measurements;

static void Main(string[] args)
{
    var results = TestLatencySimulator();
    var anomalies = results.Result.Where(x=>x > 32).ToList();
    foreach (var result in anomalies)
    {
        Console.WriteLine(result);
    }

    Console.ReadLine();
}

static async Task<List<long>> TestLatencySimulator()
{
    latencySimulator = new LatencySimulator();
    latencySimulator.messageReady += MessageReady;
    var numberOfMeasurementsMax = 1000;
    pendingMessages = new ConcurrentDictionary<int, PendingMessage>();
    measurements = new List<long>();

    var sendTask = Task.Factory.StartNew(() =>
    {
        for (var i = 0; i < numberOfMeasurementsMax; i++)
        {
            var message = new Message { Id = i };
            pendingMessages.TryAdd(i, new PendingMessage() { Id = i });
            latencySimulator.Delay(Serialize(message), 30);
            Thread.Sleep(50);
        }
    });

    //Spin some tasks up to simulate high CPU usage
    Task.Factory.StartNew(() => { FindPrimeNumber(100000); });
    Task.Factory.StartNew(() => { FindPrimeNumber(100000); });
    Task.Factory.StartNew(() => { FindPrimeNumber(100000); });

    sendTask.Wait();

    return measurements;
}

static long FindPrimeNumber(int n)
{
    int count = 0;
    long a = 2;
    while (count < n)
    {
        long b = 2;
        int prime = 1;// to check if found a prime
        while (b * b <= a)
        {
            if (a % b == 0)
            {
                prime = 0;
                break;
            }
            b++;
        }
        if (prime > 0)
        {
            count++;
        }
        a++;
    }
    return (--a);
}

private static void MessageReady(object sender, EventArgs e)
{
    LatencySimulatorResult result = (LatencySimulatorResult)e;

    var message = (Message)Deserialize(result.message);
    if (pendingMessages.ContainsKey(message.Id) != true) return;

    pendingMessages[message.Id].stopwatch.Stop();
    measurements.Add(pendingMessages[message.Id].stopwatch.ElapsedMilliseconds);
}

static object Deserialize(byte[] arrBytes)
{
    using (var memStream = new MemoryStream())
    {
        var binForm = new BinaryFormatter();
        memStream.Write(arrBytes, 0, arrBytes.Length);
        memStream.Seek(0, SeekOrigin.Begin);
        var obj = binForm.Deserialize(memStream);
        return obj;
    }
}

static byte[] Serialize<T>(T obj)
{
    BinaryFormatter bf = new BinaryFormatter();
    using (var ms = new MemoryStream())
    {
        bf.Serialize(ms, obj);
        return ms.ToArray();
    }
}

如果您运行此代码,您将看到大约 5% 的消息延迟超过预期的 30 毫秒。事实上,有些高达60ms。没有任何后台任务或高 CPU 使用率,模拟器会按预期运行。

我需要它们都为 30 毫秒(或尽可能接近)——我不想要一些任意的 50-60 毫秒延迟。

任何人都可以建议我如何重构这段代码,以便我可以达到预期的结果,但不使用Thread.Sleep()CPU 开销并且尽可能少地使用 CPU 开销?

标签: c#thread-sleep

解决方案


推荐阅读