首页 > 解决方案 > 运行并行的、IO 绑定的操作,这些操作以“随机”间隔重复,最大并发性有限制

问题描述

我正在尝试为以下情况提出一个“好的”设计:

我知道,原则上,这种设置或一次运行 4 个超过一组可能意味着操作的执行可能“落后”,例如当许多操作想要非常快速地重复时,但只有 4 个可以在同时。但是,您可以认为总是存在相对不活动的时期,在这些时期中,操作将表明在需要再次运行之前存在显着延迟,从而允许其他成员“赶上”。

我有一个“有效”的解决方案,但考虑到我对异步和线程的普遍无知,我确信存在更好的方法。我对此的研究很快就爆发出令人眼花缭乱的使用SemaphoreSlim、自定义TaskScheduler实现等选项。

真正让我震惊的是,我能找到的几乎所有示例似乎都假设了对WaitAll样式语义的渴望,其中一些任务队列根据最大并行度并行排空,直到它为空。然而,在我的情况下,操作需要不断地循环到“队列”中,并且只在给定的经过一段时间后执行。

那么,我的问题是,作为概念证明的代码是否糟糕到危险,或者仅仅是它使用的模式很糟糕,或者只是在性能方面很糟糕。最后一个我最不关心的,因为任何数据库操作都可能在几秒钟内完成,所以在控制代码中花费几毫秒并不重要。但是,如果存在相对易于理解和更有效的解决方案,我当然不想在愚蠢的情况下效率低下。

对于我自己的启发,我特别好奇下面的模式是否会导致大量的上下文切换或其他类似的开销。

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Linq;

namespace ConsoleApp2
{
    class Worker
    {
        private CancellationToken ct;
        public int id;          // just something to provide meaningful console output 
        public bool working;
        public DateTime nextExecutionTime; 
        public Worker(int id, CancellationToken ct) 
        { 
            this.ct = ct; 
            this.id = id;
            this.working = false;
            
        }
        public async Task DoWorkAsync()
        {
            int delay = 0;
            try
            {
                working = true;
                Console.WriteLine($"executing task {id}");
                delay = (int)nextExecutionTime.Subtract(DateTime.Now).TotalMilliseconds;
                delay = delay < 0 ? 0 : delay;
                // wait until it's time to "hit the database"
                await Task.Delay(delay, ct); 
                // run inherently IO based operation, eg a database query, simulated here with delay
                await Task.Delay(1000 + delay, ct);
                // simulate calculated delay until next execution - this value actually provided as output from the stored procedure doing the work
                nextExecutionTime = DateTime.Now.AddSeconds(new Random().Next() % 10);  
            }
            catch (TaskCanceledException) { }
            finally { working = false; }
        }
    }

    class Program
    {
        private async static void RunWorkers(List<Worker> runnables, CancellationToken ct)
        {
            var running = new List<Task>();
            // any random 4 to start off, it doesn't matter which
            running.AddRange(runnables.Take(4).Select(r => r.DoWorkAsync())); 
            while (!ct.IsCancellationRequested)
            {
                Task t = await Task.WhenAny(running);
                running.Remove(t);
                // this seems like a very inefficient way to handle scheduling
                Worker next = runnables.Where(r => !r.working).OrderBy(r => r.nextExecutionTime).First();
                running.Add(next.DoWorkAsync());
            }
        }

        static void Main(string[] args)
        {
            List<Worker> runnables = new List<Worker>();
            var cts = new CancellationTokenSource();
            for(int i = 0; i < 20; i++) 
            { 
                runnables.Add(new Worker(i, cts.Token)); 
            }
            Task t = Task.Run(() => RunWorkers(runnables, cts.Token));
            Console.ReadKey();
            cts.Cancel();
            t.GetAwaiter().GetResult();
        }
    }
}

标签: c#concurrencyasync-await

解决方案


您的实现不是线程安全的。该问题与变量无关,List<Task> running因为尽管它被不同的线程改变,但它由单个异步工作流访问,并且TPL 会在异步工作流切换线程时注意添加适当的内存屏障。问题与字段bool working和相关DateTime nextExecutionTime,可以由多个线程在没有同步的情况下并行访问。这可能会导致严重的问题,例如Worker同时安排多次。我的建议是将所有读取和改变workingandnextExecutionTime字段的控制代码从DoWorkAsync方法移到中央RunWorkers方法,以摆脱不需要的并行性。这可能需要将DoWorkAsync方法的结果类型从Task更改为Task<Worker>,以便中央工作流知道哪个工作人员刚刚完成,并相应地改变其字段。

另一个潜在的问题是使用DateTimes 来控制重新调度。不能保证系统Now属性总是向前发展。可以自动或手动调整并向后移动,从而对 s 的调度造成各种奇怪/意外的影响Worker。要解决此问题,您可以考虑将DateTime基于 - 的调度替换为基于 - 的调度TimeSpan,使用作为测量设备 a Stopwatch

关于Task.WhenAny-in-a-loop 模式的低效率,如果您有大约 1000 个或更多并发运行的任务,这将成为一个考虑因素。在这种情况下,问题将非常严重,因为开销与任务数量不是线性相关,而是二次相关。对于少于 100 个任务,我认为你不应该担心它,考虑到任何替代方法(比如使用 a PrioritySemaphore)都会复杂得多。

使用OrderBy运算符也是如此。使用像MoreLinqMinBy库中的O(N) 运算符会更有效,但任何性能优势很可能都可以忽略不计。


推荐阅读