c# - 运行并行的、IO 绑定的操作,这些操作以“随机”间隔重复,最大并发性有限制
问题描述
我正在尝试为以下情况提出一个“好的”设计:
- 我有一组独特的、固有的 IO 绑定操作。它们恰好是数据库操作。
- 集合中的每个成员都必须无限重复。
- 每次成员执行时,它都会进行一些内部计算,以确定在再次执行之前等待多长时间。这是一种“自动调整”,以防止不必要地用查询敲击数据库。
- 集合中可能有多达几十个成员,但永远不会接近像 100 或更多这样的大量成员
- 我想将可以并行运行的数量限制为某个“合理”的数量,比如 4。
- 下一次执行时间的精度可能很低,不必严格遵守。
我知道,原则上,这种设置或一次运行 4 个超过一组可能意味着操作的执行可能“落后”,例如当许多操作想要非常快速地重复时,但只有 4 个可以在同时。但是,您可以认为总是存在相对不活动的时期,在这些时期中,操作将表明在需要再次运行之前存在显着延迟,从而允许其他成员“赶上”。
我有一个“有效”的解决方案,但考虑到我对异步和线程的普遍无知,我确信存在更好的方法。我对此的研究很快就爆发出令人眼花缭乱的使用SemaphoreSlim
、自定义TaskScheduler
实现等选项。
真正让我震惊的是,我能找到的几乎所有示例似乎都假设了对WaitAll
样式语义的渴望,其中一些任务队列根据最大并行度并行排空,直到它为空。然而,在我的情况下,操作需要不断地循环到“队列”中,并且只在给定的经过一段时间后执行。
那么,我的问题是,作为概念证明的代码是否糟糕到危险,或者仅仅是它使用的模式很糟糕,或者只是在性能方面很糟糕。最后一个我最不关心的,因为任何数据库操作都可能在几秒钟内完成,所以在控制代码中花费几毫秒并不重要。但是,如果存在相对易于理解和更有效的解决方案,我当然不想在愚蠢的情况下效率低下。
对于我自己的启发,我特别好奇下面的模式是否会导致大量的上下文切换或其他类似的开销。
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Linq;
namespace ConsoleApp2
{
class Worker
{
private CancellationToken ct;
public int id; // just something to provide meaningful console output
public bool working;
public DateTime nextExecutionTime;
public Worker(int id, CancellationToken ct)
{
this.ct = ct;
this.id = id;
this.working = false;
}
public async Task DoWorkAsync()
{
int delay = 0;
try
{
working = true;
Console.WriteLine($"executing task {id}");
delay = (int)nextExecutionTime.Subtract(DateTime.Now).TotalMilliseconds;
delay = delay < 0 ? 0 : delay;
// wait until it's time to "hit the database"
await Task.Delay(delay, ct);
// run inherently IO based operation, eg a database query, simulated here with delay
await Task.Delay(1000 + delay, ct);
// simulate calculated delay until next execution - this value actually provided as output from the stored procedure doing the work
nextExecutionTime = DateTime.Now.AddSeconds(new Random().Next() % 10);
}
catch (TaskCanceledException) { }
finally { working = false; }
}
}
class Program
{
private async static void RunWorkers(List<Worker> runnables, CancellationToken ct)
{
var running = new List<Task>();
// any random 4 to start off, it doesn't matter which
running.AddRange(runnables.Take(4).Select(r => r.DoWorkAsync()));
while (!ct.IsCancellationRequested)
{
Task t = await Task.WhenAny(running);
running.Remove(t);
// this seems like a very inefficient way to handle scheduling
Worker next = runnables.Where(r => !r.working).OrderBy(r => r.nextExecutionTime).First();
running.Add(next.DoWorkAsync());
}
}
static void Main(string[] args)
{
List<Worker> runnables = new List<Worker>();
var cts = new CancellationTokenSource();
for(int i = 0; i < 20; i++)
{
runnables.Add(new Worker(i, cts.Token));
}
Task t = Task.Run(() => RunWorkers(runnables, cts.Token));
Console.ReadKey();
cts.Cancel();
t.GetAwaiter().GetResult();
}
}
}
解决方案
您的实现不是线程安全的。该问题与变量无关,List<Task> running
因为尽管它被不同的线程改变,但它由单个异步工作流访问,并且TPL 会在异步工作流切换线程时注意添加适当的内存屏障。问题与字段bool working
和相关DateTime nextExecutionTime
,可以由多个线程在没有同步的情况下并行访问。这可能会导致严重的问题,例如Worker
同时安排多次。我的建议是将所有读取和改变working
andnextExecutionTime
字段的控制代码从DoWorkAsync
方法移到中央RunWorkers
方法,以摆脱不需要的并行性。这可能需要将DoWorkAsync
方法的结果类型从Task
更改为Task<Worker>
,以便中央工作流知道哪个工作人员刚刚完成,并相应地改变其字段。
另一个潜在的问题是使用DateTime
s 来控制重新调度。不能保证系统Now
属性总是向前发展。可以自动或手动调整并向后移动,从而对 s 的调度造成各种奇怪/意外的影响Worker
。要解决此问题,您可以考虑将DateTime
基于 - 的调度替换为基于 - 的调度TimeSpan
,使用作为测量设备 a Stopwatch
。
关于Task.WhenAny
-in-a-loop 模式的低效率,如果您有大约 1000 个或更多并发运行的任务,这将成为一个考虑因素。在这种情况下,问题将非常严重,因为开销与任务数量不是线性相关,而是二次相关。对于少于 100 个任务,我认为你不应该担心它,考虑到任何替代方法(比如使用 a PrioritySemaphore
)都会复杂得多。
使用OrderBy
运算符也是如此。使用像MoreLinqMinBy
库中的O(N) 运算符会更有效,但任何性能优势很可能都可以忽略不计。
推荐阅读
- python - 使用 manylinux + auditwheel pip Wheels 与 Conda 打包
- sql - 从 Oracle 表生成插入
- rust - 将借来的值保存在集合中的 Rust 方法是什么?
- apache-kafka - Kafka Streams - 如何更好地控制内部创建的状态存储主题的分区?
- git - GIT - 致命:不能使用 .idea/ 作为排除文件
- jquery - 如果用户名和电子邮件都可用,如何显示按钮
- java - FireFox 无法在 ws://1.1.1.26:81 建立与服务器的连接
- inno-setup - 将包含引号的命令行参数传递给安装程序
- linux - 复制名称中包含偶数的文件 - bash
- sql-server - SELECT 语句上的 SQL Server 消息 511 错误