首页 > 解决方案 > 加权公平排队算法仅使用 2 个数据包流进行循环

问题描述

我尝试实现加权公平排队算法的简单模拟。

但是我注意到一个问题,当只使用 2 个数据包流时,该算法并没有让 T1 数据包的数量达到应有的两倍,因为它们是 T2 数据包大小的一半。相反,它只是在 2 个流之间进行循环。

我理解问题是 T1 需要一些时间才能将另一个数据包添加到队列中,并且当 T1 队列为空时,加权公平队列有时间运行。

这似乎不是预期的行为,我是否在算法中遗漏了一些可以解决这种极端情况的东西?

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication1
{
    public class Packet
    {
        public string name;
        public ManualResetEventSlim mres;

        public long virFinish;
        public long packetSize;
        public Packet(string pname, long p_virFinish, long p_packetSize)
        {
            name = pname;
            mres = new ManualResetEventSlim(false);
            virFinish = p_virFinish;
            packetSize = p_packetSize;
        }
    }

    public class Queue
    {
        public ConcurrentQueue<Packet> q = new ConcurrentQueue<Packet>();
        public long virStart;
        public long lastVirFinish;
    }

    public class FairQueue
    {
        long VirtualTime; // system virtual time
        ConcurrentDictionary<string, Queue> queues = new ConcurrentDictionary<string, Queue>();
        public FairQueue() {
            VirtualTime = long.MinValue;
            queues.GetOrAdd("t1", new Queue());
            queues.GetOrAdd("t2", new Queue());
            queues.GetOrAdd("t3", new Queue());
        }

        public ManualResetEventSlim Write(string queueName, long packetSize)
        {
            var queue = queues[queueName];
            Volatile.Write(ref queue.virStart, Math.Max(VirtualTime, queue.lastVirFinish));
            var packetFinish = queue.virStart + packetSize;
            var packet = new Packet(queueName, packetFinish, packetSize);
            queue.q.Enqueue(packet);


            Volatile.Write(ref queue.lastVirFinish, packetFinish);

            UpdateVirtualClock(queue);

            return packet.mres;
        }

        private void UpdateVirtualClock(Queue queue)
        {
            long minStart = queue.virStart;
            foreach (var item in queues)
            {
                if (!item.Value.q.IsEmpty)
                {
                    minStart = Math.Min(item.Value.virStart, minStart);
                }
            }
            Volatile.Write(ref VirtualTime, Math.Max(VirtualTime, minStart));
        }


        // return queue with smallest lastVirFinish
        public Queue SelectQueue()
        {
            long minVirFinish = long.MaxValue; // infinity
            Queue selected = null;
            foreach (var queue in queues)
            {

                if (!queue.Value.q.IsEmpty)
                {
                    var lastVirFinish = Volatile.Read(ref queue.Value.lastVirFinish);
                    if (lastVirFinish < minVirFinish) {
                        minVirFinish = queue.Value.lastVirFinish;
                        selected = queue.Value;
                    }
                }
            }
            return selected;
        }

        public Packet Send()
        {
            var selectedQueue = SelectQueue();
            if (selectedQueue != null)
            {
                Packet p;
                selectedQueue.q.TryDequeue(out p);
                /* Set the start and the finish times of the remaining packets in the queue */
                if (!selectedQueue.q.IsEmpty)
                {
                    var next = selectedQueue.q.Take(1).First();
                    selectedQueue.virStart = selectedQueue.lastVirFinish;
                    selectedQueue.lastVirFinish = selectedQueue.virStart + next.packetSize;
                }
                return p;
            }
            else
            {
                return null;
            }
        }


    }


    class Program
    {

        static void Main(string[] args)
        {
            FairQueue fq = new FairQueue();

            // Task that enqueue packet
            var t1 = Task.Factory.StartNew(
             () => {
                 while (true)
                 {
                     System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
                     sw.Start();
                     var s1 = fq.Write("t1", 100);
                     s1.Wait();
                     sw.Stop();
                 }
             }
            );

            // Task that enqueue packet
            var t2 = Task.Factory.StartNew(
             () => {
                 while (true)
                 {
                     System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
                     sw.Start();
                     var s1 = fq.Write("t2", 200);
                     s1.Wait();
                     sw.Stop();
                 }
             }
            );

            // Loop that run the Weighted Fair Queue Algorithm
            while (true)
            {
                var toSend = fq.Send();
                if (toSend != null)
                {
                    Console.WriteLine(toSend.name);
                    Thread.Sleep((int)toSend.packetSize);
                    toSend.mres.Set();
                }
            }
        }
    }
}

标签: algorithmnetworkingnetwork-programmingqueue

解决方案


推荐阅读