首页 > 解决方案 > 带有 semaphoreSlim 的模式 WaitAllOneByOne,这个实现线程安全吗?

问题描述

我有两个任务要执行,一个是获取数据,大约需要 2 秒才能完成,另一个是处理数据,大约需要 3 秒。我必须为大约 13.000 个不同的元素运行这两个任务。如果我连续运行此任务,大约需要 18 小时才能完成。

现在我正在考虑使用模式 WaitAllOneByOne ,但是调用Task.Run每个元素都会崩溃我的数据库,所以我想实现一个 SemaphoreSlim 来控制每个任务应该运行多少线程。

到目前为止,我能够创建该模式的实现,但我不知道它是否是线程安全的,而且我对如何测试它一无所知

代码如下:

class Program
{
    static async Task Main(string[] args)
    {
        List<Guid> idsCombinacion = GetIdsCombinacion(10);
        List<Task<string>> todo = new List<Task<string>>(idsCombinacion.Count);
        List<Task> processing = new List<Task>(idsCombinacion.Count);

        //this starts callings to GetData, up to 2 threads.
        SemaphoreSlim semaphoreGetData = new SemaphoreSlim(2);
        var dataTask = GetDataAsync(idsCombinacion, todo, semaphoreGetData);

        SemaphoreSlim semaphoreProcessData = new SemaphoreSlim(3);
        while (todo.Count > 0)
        {
            Task<string>[] tasks = todo.ToArray();
            int idx = Task.WaitAny(tasks);
            todo.RemoveAt(idx);

            //once the tasks in the to-do list goes finishing, we start a new task to perform the Processing, up to 3 threads
            semaphoreProcessData.Wait();
            var result = tasks[idx].Result;
            tasks[idx].Dispose();
            var t = Task.Factory.StartNew(() =>
            {
                Task.Delay(3000).Wait();
                Console.WriteLine($"todo: {todo.Count}; processing: {processing.Count}; guid: {result}");
                semaphoreProcessData.Release();
            }, TaskCreationOptions.LongRunning);
            processing.Add(t);

        }

        var aux = processing.ToArray();
        //we wait for all the jobs to complete
        Task.WaitAll(aux);

        await dataTask;

        semaphoreGetData.Dispose();
        semaphoreProcessData.Dispose();
    }

    private static async Task GetDataAsync(List<Guid> idsCombinacion, List<Task<string>> todo, SemaphoreSlim semaphoreGetData)
    {
        foreach (var idComb in idsCombinacion)
        {
            await semaphoreGetData.WaitAsync();
            var t = Task<string>.Factory.StartNew(() =>
            {
                Task.Delay(2000).Wait();
                semaphoreGetData.Release();
                return "Guid: " + idComb;
            }, TaskCreationOptions.LongRunning);
            todo.Add(t);
        }
    }


    private static List<Guid> GetIdsCombinacion(int howMany)
    {
        var idsCombinacion = new List<Guid>(howMany);
        for (int i = 0; i < howMany; i++)
            idsCombinacion.Add(Guid.NewGuid());
        return idsCombinacion;
    }
}   

Joe Hummel 博士在此复数课程中的 WaitAllOneByOne模式。

标签: c#multithreadingasynchronousthread-safetytask-parallel-library

解决方案


推荐阅读