首页 > 解决方案 > 有没有比“一劳永逸”更好、更可靠的模式来同时处理可变数量的异步任务?

问题描述

我有以下代码:

while (!cancellationToken.IsCancellationRequested)
{
    var connection = await listener.AcceptAsync(cancellationToken);

    HandleConnectionAsync(connection, cancellationToken)
        .FireAndForget(HandleException);
}

FireAndForget是一个扩展方法:

public static async void FireAndForget(this ValueTask task, Action<Exception> exceptionHandler)
{
    try
    {
        await task.ConfigureAwait(false);
    }
    catch (Exception e)
    {
        exceptionHandler.Invoke(e);
    }
}

while loop服务器生命周期。当新连接被接受时,它会启动一些“后台任务”,以便它可以处理这个新连接,然后while loop在不等待任何东西的情况下返回接受新连接 - 暂停生命周期。

我不能await HandleConnectionAsync(暂停生命周期)在这里,因为我想立即接受另一个连接(如果有的话)并能够同时处理多个连接。HandleConnectionAsync受 I/O 限制,一次处理一个连接,直到关闭(任务在一段时间后完成)。

必须单独处理连接 - 我不希望在处理一个连接时出现一些错误对其他连接产生任何影响的情况。

我在这里使用的“一劳永逸”解决方案有效,但一般规则是始终await使用异步方法并且永远不要使用async void.

似乎我已经违反了规则,那么在此处描述的情况下,是否有更好,也许更可靠的方法来同时处理可变数量的异步 I/O 绑定任务(任务数量随时间变化)?

更多信息:

标签: c#.netasync-await

解决方案


我假设更改为 SignalR 之类的东西不是一个可接受的解决方案。那将是我的第一个建议。

自定义服务器套接字是可以接受某种“即发即弃”的场景。我正在考虑向 AsyncEx 添加一种“任务管理器”类型,以使这种解决方案更容易,但还没有完成。

底线是您需要自己管理连接列表。“连接”对象可以包括一个Task代表处理循环的;没关系。在那里也有其他属性也很有用(尤其是用于调试或管理目的),例如远程 IP。

所以我会这样处理它:

private readonly object _mutex = new object();
private readonly List<State> _connections = new List<State>();

private void Add(State state)
{
  lock (_mutex)
    _connections.Add(state);
}
private void Remove(State state)
{
  lock (_mutex)
    _connections.Remove(state);
}

public async Task RunAsync(CancellationToken cancellationToken)
{
  while (true)
  {
    var connection = await listener.AcceptAsync(cancellationToken);

    Add(new State(this, connection));
  }
}

private sealed class State
{
  private readonly Parent _parent;

  public State(Parent parent, Connection connection, CancellationToken cancellationToken)
  {
    _parent = parent;
    Task = ExecuteAsync(connection, cancellationToken);
  }

  private static async Task ExecuteAsync(Connection connection, CancellationToken cancellationToken)
  {
    try { await HandleConnectionAsync(connection, cancellationToken); }
    finally { _parent.Remove(this); }
  }

  public Task Task { get; }
  // other properties as desired, e.g., RemoteAddress
}

您现在有一个连接集合。您可以忽略State对象中的任务(就像上面的代码所做的那样),这就像一劳永逸。或者你可以await在某个时候将它们全部显示出来。例如:

public async Task RunAsync(CancellationToken cancellationToken)
{
  try
  {
    while (true)
    {
      var connection = await listener.AcceptAsync(cancellationToken);

      Add(new State(this, connection));
    }
  }
  catch (OperationCanceledException)
  {
    // Wait for all connections to cancel.
    // I'm not really sure why you would *want* to do this, though.
    List<State> connections;
    lock (_mutex) { connections = _connections.ToList(); }
    await Task.WhenAll(connections.Select(x => x.Task));
  }
}

然后很容易扩展State对象,这样你就可以做一些有时对服务器应用程序有用的事情,例如:

  • 列出此服务器连接到的所有远程地址。
  • 等到特定连接完成。
  • ...

笔记:

  • 使用一种模式进行取消。传递令牌将产生一个OperationCanceledException,这是正常的取消模式。该代码以前也执行 a while (!IsCancellationRequested),导致取消成功完成,这不是正常的取消模式。所以我删除了它,所以代码不再使用两种取消模式。
  • 在使用原始套接字时,通常情况下,您需要不断地读取(即使您正在写入)并定期写入(即使您没有要发送的数据)。所以你HandleConnectionAsync应该启动一个异步读写器,然后使用Task.WhenAll.
  • 我删除了对的调用,HandleException因为(可能)它所做的任何事情都应该由State.ExecuteAsync. 如有必要,将其重新添加并不难。

推荐阅读