首页 > 解决方案 > 同时处理数千个数据库调用

问题描述

这个小控制台应用程序是进行数千次数据库调用的概念证明。这个想法是我们希望许多呼叫同时发生。在开始下一个呼叫之前,无需等待一个呼叫完成。

起初,这(见下文)似乎是一个很好的方法,但是当我们将它应用到实际的数据库调用时,我们看到的是它似乎堆叠了进程。意思是,它启动了所有这些,但没有一个完成,直到所有这些都启动。

我希望(并希望)一些电话在其他电话开始之前完成。但情况似乎并非如此。

class Program
{
    static void Main(string[] args)
    {
        Console.WriteLine("starting");
        DatabaseCallsAsync().Wait();
        Console.WriteLine("ending"); // Must not fire until all database calls are complete.
        Console.Read();
    }

    static async Task DatabaseCallsAsync()
    {
        List<int> inputParameters = new List<int>();
        for (int i = 0; i < 100; i++)
        {
            inputParameters.Add(i);
        }
        await Task.WhenAll(inputParameters.Select(x => DatabaseCallAsync($"Task {x}")));
    }

    static async Task DatabaseCallAsync(string taskName)
    {
        Console.WriteLine($"{taskName}: start");
        await Task.Delay(1000);
        Console.WriteLine($"{taskName}: finish");
    }
}

如何调整它以使某些呼叫完成而不等待所有呼叫开始?

标签: c#.netasynchronous

解决方案


重要的是要注意几件事:

  1. async方法开始同步运行。魔法发生在await,如果await给出一个不完整的Task
  2. 异步!=并行。异步运行某些东西只是让线程在等待某个地方的回复时去做其他事情。这并不意味着同时发生多种事情。

考虑到这些事情,当它遍历您创建的所有任务时,这就是您的案例中发生的事情:

  1. 所有的任务都放在“待办事项”清单上。
  2. 任务 1 已启动。
  3. 在处await,返回一个不完整Task的,并将该方法的其余部分放在“待办事项”列表中。
  4. 线程意识到无事可做,然后继续“待办事项”列表中的下一件事,即开始下一个Task

在第 4 步,“待办事项”列表中的下一件事将始终是列表中的下一个Task,直到列表中没有任何内容为止。只有这样,“待办事项”列表上的下一件事就是按照完成的顺序继续已完成的任务。

所有这些都发生在同一个线程上:它是异步的,而不是并行的。

但!如果您实际使用 SQL 调用(并且您为每个任务建立一个新连接,因为单个连接一次只能运行一个查询 - 除非您启用Multiple Active Result Sets)并监控 SQL,您将看到这些调用传入和可能在所有这些都开始之前完成,因为 SQL 并行运行查询。只是C# 方法的延续在所有任务都开始之前不会开始

如果您真的希望并行运行这些,那么您需要多线程。您可以查看(此处Parallel.ForEach的示例),但这不是异步的。它将为每个实例创建一个线程,并且该线程将阻塞直到完成。这在桌面应用程序中没什么大不了的,但在 ASP.NET 中,线程是有限的,因此您需要小心。

这里有一个很大的讨论但我特别喜欢这个答案,它不是多线程,但提供了一种限制你的任务的方法。因此,您可以告诉它启动x任务数,并在每个任务完成后启动下一个任务,直到所有任务都运行完毕。对于您的代码,它看起来像这样(一次运行 10 个任务):

static async Task DatabaseCallsAsync()
{
    List<int> inputParameters = new List<int>();
    for (int i = 0; i < 100; i++)
    {
        inputParameters.Add(i);
    }

    await RunWithMaxDegreeOfConcurrency(10, inputParameters, x => DatabaseCallAsync($"Task {x}"));
}

static async Task DatabaseCallAsync(string taskName)
{
    Console.WriteLine($"{taskName}: start");
    await Task.Delay(1000);
    Console.WriteLine($"{taskName}: finish");
}

public static async Task RunWithMaxDegreeOfConcurrency<T>(
     int maxDegreeOfConcurrency, IEnumerable<T> collection, Func<T, Task> taskFactory)
{
    var activeTasks = new List<Task>(maxDegreeOfConcurrency);
    foreach (var task in collection.Select(taskFactory))
    {
        activeTasks.Add(task);
        if (activeTasks.Count == maxDegreeOfConcurrency)
        {
            await Task.WhenAny(activeTasks.ToArray());
            //observe exceptions here
            activeTasks.RemoveAll(t => t.IsCompleted); 
        }
    }
    await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t => 
    {
        //observe exceptions in a manner consistent with the above   
    });
}

推荐阅读