c# - 带有 semaphoreSlim 的模式 WaitAllOneByOne,这个实现线程安全吗?
问题描述
我有两个任务要执行,一个是获取数据,大约需要 2 秒才能完成,另一个是处理数据,大约需要 3 秒。我必须为大约 13.000 个不同的元素运行这两个任务。如果我连续运行此任务,大约需要 18 小时才能完成。
现在我正在考虑使用模式 WaitAllOneByOne ,但是调用Task.Run
每个元素都会崩溃我的数据库,所以我想实现一个 SemaphoreSlim 来控制每个任务应该运行多少线程。
到目前为止,我能够创建该模式的实现,但我不知道它是否是线程安全的,而且我对如何测试它一无所知。
代码如下:
class Program
{
static async Task Main(string[] args)
{
List<Guid> idsCombinacion = GetIdsCombinacion(10);
List<Task<string>> todo = new List<Task<string>>(idsCombinacion.Count);
List<Task> processing = new List<Task>(idsCombinacion.Count);
//this starts callings to GetData, up to 2 threads.
SemaphoreSlim semaphoreGetData = new SemaphoreSlim(2);
var dataTask = GetDataAsync(idsCombinacion, todo, semaphoreGetData);
SemaphoreSlim semaphoreProcessData = new SemaphoreSlim(3);
while (todo.Count > 0)
{
Task<string>[] tasks = todo.ToArray();
int idx = Task.WaitAny(tasks);
todo.RemoveAt(idx);
//once the tasks in the to-do list goes finishing, we start a new task to perform the Processing, up to 3 threads
semaphoreProcessData.Wait();
var result = tasks[idx].Result;
tasks[idx].Dispose();
var t = Task.Factory.StartNew(() =>
{
Task.Delay(3000).Wait();
Console.WriteLine($"todo: {todo.Count}; processing: {processing.Count}; guid: {result}");
semaphoreProcessData.Release();
}, TaskCreationOptions.LongRunning);
processing.Add(t);
}
var aux = processing.ToArray();
//we wait for all the jobs to complete
Task.WaitAll(aux);
await dataTask;
semaphoreGetData.Dispose();
semaphoreProcessData.Dispose();
}
private static async Task GetDataAsync(List<Guid> idsCombinacion, List<Task<string>> todo, SemaphoreSlim semaphoreGetData)
{
foreach (var idComb in idsCombinacion)
{
await semaphoreGetData.WaitAsync();
var t = Task<string>.Factory.StartNew(() =>
{
Task.Delay(2000).Wait();
semaphoreGetData.Release();
return "Guid: " + idComb;
}, TaskCreationOptions.LongRunning);
todo.Add(t);
}
}
private static List<Guid> GetIdsCombinacion(int howMany)
{
var idsCombinacion = new List<Guid>(howMany);
for (int i = 0; i < howMany; i++)
idsCombinacion.Add(Guid.NewGuid());
return idsCombinacion;
}
}
Joe Hummel 博士在此复数课程中的 WaitAllOneByOne模式。
解决方案
推荐阅读
- javascript - ChartJS - 通过更改 y 轴标签来更改图表类型
- mongodb - 是否可以在使用 Spring mongoDB 的 updateMulti 之后返回更新的结果?
- python - 学习数据科学
- google-oauth - 通过 API 访问 Google 照片的长期令牌?
- ipython - 如何在 IPython 中获取完整的存储历史记录?
- r - 为缺失值制作 x 轴连续 ggplot R
- apache-kafka - 使用格式化程序消费主题`__offset_consumers`然后在kafka中不输出任何内容
- python - 从字符串制作字典的 Pythonic 方式
- angular - Angular async ngOnInit 实例化所需的属性
- python - 将具有非唯一 ID 和列值的数据帧转换为每个唯一 ID 的单行