c# - 有没有比“一劳永逸”更好、更可靠的模式来同时处理可变数量的异步任务?
问题描述
我有以下代码:
while (!cancellationToken.IsCancellationRequested)
{
var connection = await listener.AcceptAsync(cancellationToken);
HandleConnectionAsync(connection, cancellationToken)
.FireAndForget(HandleException);
}
这FireAndForget
是一个扩展方法:
public static async void FireAndForget(this ValueTask task, Action<Exception> exceptionHandler)
{
try
{
await task.ConfigureAwait(false);
}
catch (Exception e)
{
exceptionHandler.Invoke(e);
}
}
是while loop
服务器生命周期。当新连接被接受时,它会启动一些“后台任务”,以便它可以处理这个新连接,然后while loop
在不等待任何东西的情况下返回接受新连接 - 暂停生命周期。
我不能await
HandleConnectionAsync
(暂停生命周期)在这里,因为我想立即接受另一个连接(如果有的话)并能够同时处理多个连接。HandleConnectionAsync
受 I/O 限制,一次处理一个连接,直到关闭(任务在一段时间后完成)。
必须单独处理连接 - 我不希望在处理一个连接时出现一些错误对其他连接产生任何影响的情况。
我在这里使用的“一劳永逸”解决方案有效,但一般规则是始终await
使用异步方法并且永远不要使用async void
.
似乎我已经违反了规则,那么在此处描述的情况下,是否有更好,也许更可靠的方法来同时处理可变数量的异步 I/O 绑定任务(任务数量随时间变化)?
更多信息:
- 甚至在返回连接之前,每次调用都会
AcceptAsync
分配系统资源,我希望尽可能避免这种情况(连接可能几个小时都不会返回(代码可能会“等待”几个小时)——直到某个外部客户端决定连接到我的服务器)。最好假设这是我不想同时/并行调用的方法 -AcceptAsync
一次就足够了 - 请考虑到我每天可以有数百万个客户端连接和断开我的服务器和服务器(
while loop
)可以工作很多天 - 我不知道在特定时间需要处理多少个连接
- 我确实知道我的程序能够同时处理的最大连接数
- 如果我达到了
maximum number of connections
限制,那么AcceptAsync
在其他一些活动连接关闭之前不会返回新连接,所以我不需要担心,但任何基于此限制的解决方案都必须考虑到活动连接可能会关闭并且我仍然需要处理新的连接 - 连接的数量会随着时间而变化。“一劳永逸”对此没有任何问题 - 的代码
HandleConnectionAsync
不相关 - 它只处理一个连接直到关闭(任务在一段时间后完成)并且是 I/O 绑定的(HandleConnectionAsync
一次处理一个连接,但当然我们可以启动多个HandleConnectionAsync
任务来同时处理多个连接 -这就是我用“一劳永逸”所做的)
解决方案
我假设更改为 SignalR 之类的东西不是一个可接受的解决方案。那将是我的第一个建议。
自定义服务器套接字是可以接受某种“即发即弃”的场景。我正在考虑向 AsyncEx 添加一种“任务管理器”类型,以使这种解决方案更容易,但还没有完成。
底线是您需要自己管理连接列表。“连接”对象可以包括一个Task
代表处理循环的;没关系。在那里也有其他属性也很有用(尤其是用于调试或管理目的),例如远程 IP。
所以我会这样处理它:
private readonly object _mutex = new object();
private readonly List<State> _connections = new List<State>();
private void Add(State state)
{
lock (_mutex)
_connections.Add(state);
}
private void Remove(State state)
{
lock (_mutex)
_connections.Remove(state);
}
public async Task RunAsync(CancellationToken cancellationToken)
{
while (true)
{
var connection = await listener.AcceptAsync(cancellationToken);
Add(new State(this, connection));
}
}
private sealed class State
{
private readonly Parent _parent;
public State(Parent parent, Connection connection, CancellationToken cancellationToken)
{
_parent = parent;
Task = ExecuteAsync(connection, cancellationToken);
}
private static async Task ExecuteAsync(Connection connection, CancellationToken cancellationToken)
{
try { await HandleConnectionAsync(connection, cancellationToken); }
finally { _parent.Remove(this); }
}
public Task Task { get; }
// other properties as desired, e.g., RemoteAddress
}
您现在有一个连接集合。您可以忽略State
对象中的任务(就像上面的代码所做的那样),这就像一劳永逸。或者你可以await
在某个时候将它们全部显示出来。例如:
public async Task RunAsync(CancellationToken cancellationToken)
{
try
{
while (true)
{
var connection = await listener.AcceptAsync(cancellationToken);
Add(new State(this, connection));
}
}
catch (OperationCanceledException)
{
// Wait for all connections to cancel.
// I'm not really sure why you would *want* to do this, though.
List<State> connections;
lock (_mutex) { connections = _connections.ToList(); }
await Task.WhenAll(connections.Select(x => x.Task));
}
}
然后很容易扩展State
对象,这样你就可以做一些有时对服务器应用程序有用的事情,例如:
- 列出此服务器连接到的所有远程地址。
- 等到特定连接完成。
- ...
笔记:
- 使用一种模式进行取消。传递令牌将产生一个
OperationCanceledException
,这是正常的取消模式。该代码以前也执行 awhile (!IsCancellationRequested)
,导致取消成功完成,这不是正常的取消模式。所以我删除了它,所以代码不再使用两种取消模式。 - 在使用原始套接字时,通常情况下,您需要不断地读取(即使您正在写入)并定期写入(即使您没有要发送的数据)。所以你
HandleConnectionAsync
应该启动一个异步读写器,然后使用Task.WhenAll
. - 我删除了对的调用,
HandleException
因为(可能)它所做的任何事情都应该由State.ExecuteAsync
. 如有必要,将其重新添加并不难。
推荐阅读
- mysql - 这种方法对 SQL 注入安全吗?
- c - 将 C 字符转换为同名的转义字符
- python - 视频上的 Sobel 边缘检测器 - 在输出文件中未检测到
- .htaccess - 如何将基本 URL 重定向到 .htaccess 中的特定 URL?
- python - 将 Pipfile 和 Pipfile.lock 复制到新项目
- discord.py - Discord.py 即使用户不在服务器中,如何通过 cog 中的用户 ID 对用户进行 DM 我运行命令?
- javascript - 当输入框以 HTML 为焦点时如何做某事
- ios - 在 Swift 中解码 JSON API - 重复的结构名称
- python - 在字典列表中打印条件语句
- reactjs - 在 React 中显示加载文本/微调器,直到组件完全加载(文本、图像...等)