首页 > 解决方案 > 如何按顺序执行任意数量的异步任务?

问题描述

我有这个功能:

async Task RefreshProfileInfo(List<string> listOfPlayers)

// For each player in the listOfPlayers, checks an in-memory cache if we have an entry.
// If we have a cached entry, do nothing.
// If we don't have a cached entry, fetch from backend via an API call.

这个函数被非常频繁地调用,比如:

await RefreshProfileInfo(playerA, playerB, playerC)

或者

await RefreshProfileInfo(playerB, playerC, playerD) 

或者

await RefreshProfileInfo(playerE, playerF)

理想情况下,如果玩家不相互重叠,则调用不应相互影响(请求 PlayerE 和 PlayerF 不应阻止对 PlayerA、PlayerB、PlayerC 的请求)。但是,如果玩家确实相互重叠,则第二个调用应该等待第一个调用(请求 PlayerB、PlayerC、PlayerD,应该等待 PlayerA、PlayerB、PlayerC 完成)。

但是,如果这不可能,至少我希望所有调用都是连续的。(我认为它们仍然应该是异步的,因此它们不会阻塞代码的其他不相关部分)。

目前,发生的情况是每个 RefreshProfileInfo 并行运行,这导致每次都命中后端(本例中为 9 次)。

相反,我想按顺序执行它们,这样只有第一个调用会命中后端,后续调用只会命中缓存。

我应该使用什么数据结构/方法?我无法弄清楚如何将单独的呼叫“连接”到彼此。我一直在玩 Task.WhenAll() 以及 SemaphoreSlim,但我不知道如何正确使用它们。

尝试失败

我失败的尝试背后的想法是有一个帮助类,我可以在其中调用一个函数 SequentialRequest(Task),它会按顺序运行以这种方式调用的所有任务。

List<Task> waitingTasks = new List<Task>();
object _lock = new object();

public async Task SequentialRequest(Task func)
{
    var waitingTasksCopy = new List<Task>();

    lock (_lock)
    {
        waitingTasksCopy = new List<Task>(waitingTasks);
        waitingTasks.Add(func); // Add this task to the waitingTasks (for future SequentialRequests)
    }

    // Wait for everything before this to finish
    if (waitingTasksCopy.Count > 0)
    {
        await Task.WhenAll(waitingTasksCopy);
    }

    // Run this task
    await func;
}

我认为这会起作用,但是“func”要么立即运行(而不是等待早期任务完成),要么根本不运行,这取决于我如何称呼它。

如果我使用它调用它,它会立即运行:

async Task testTask()
{
    await Task.Delay(4000);
}

如果我使用它调用它,它永远不会运行:

Task testTask = new Task(async () =>
{
    await Task.Delay(4000);
});

标签: c#asynchronousasync-await

解决方案


这就是您当前的尝试不起作用的原因:

// Run this task
await func;

上面的评论没有描述代码在做什么。在异步世界中,aTask代表一些已经在进行的操作。使用await;不会“运行”任务 await它是当前代码“异步等待”任务完成的一种方式。所以没有任何函数签名Task会起作用;该任务甚至在传递给该函数之前就已经在进行中。

您的问题实际上是关于缓存异步操作。一种方法是缓存Task<T>自身。目前,您的缓存保存结果(T);您可以更改缓存以保存检索这些结果的异步操作 ( Task<T>)。例如,如果您当前的缓存类型是ConcurrentDictionary<PlayerId, Player>,您可以将其更改为ConcurrentDictionary<PlayerId, Task<Player>>.

使用任务缓存,当您的代码检查缓存条目时,如果播放器数据已加载或已开始加载,它将找到现有条目。因为Task<T>代表一些已经在进行中(或已经完成)的异步操作。

这种方法的几个注意事项:

  1. 这仅适用于内存缓存。
  2. 想想你想如何处理错误。一个简单的缓存Task<T>也会缓存错误结果,这通常是不希望的。

上面的第二点是比较棘手的部分。发生错误时,您可能需要一些额外的逻辑来从缓存中删除错误的任务。如果错误处理代码首先阻止错误任务进入缓存,则加分(以及额外的复杂性)。

至少我希望所有呼叫都是连续的

嗯,这要容易得多。SemaphoreSlim是异步替换的lock,所以可以使用共享的SemaphoreSlimawait mySemaphoreSlim.WaitAsync();在开头调用RefreshProfileInfo,将主体放在 atry中,在finally块的末尾RefreshProfileInfo,调用mySemaphoreSlim.Release();。这将限制所有调用RefreshProfileInfo顺序运行。


推荐阅读