首页 > 解决方案 > 如何同步读取和写入来自不同任务的 NetworkStream?

问题描述

我有一个类使用以下方法处理我的网络连接:

SendNetworkMessageAsync:此方法应在网络流上发送一个,如果设置了相应的参数,则等待响应,直到发生超时。类NetworkMessageModelNetworkCommandModel是辅助类,用于处理转换、消息长度等。当调用这些方法时,对应的套接字已经创建并连接到服务器。

public async Task<NetworkCommandModel> SendNetworkMessageAsync(NetworkMessageModel msg, bool awaitAnswer, int timeout = 500)
{
    var timeoutTask = Task.Delay(timeout);
    _ = networkStream.WriteAsync(msg.EncodedMessageHeader, 0, msg.EncodedMessageHeader.Length);
    _ = networkStream.WriteAsync(msg.EncodedMessageBody, 0, msg.EncodedMessageBody.Length);

    if (!awaitAnswer) return null;

    var taskReseiveResponse = ReceiveNetworkMessageAsync();
    if (await Task.WhenAny(taskReseiveResponse, timeoutTask) == taskReseiveResponse)
    {
        return NetworkCommandModel.FromStream(taskReseiveResponse.Result.EncodedMessageBody);
    }
    else
    {
        return new NetworkCommandModel(Util.NetworkCommandTypeEnum.COMMANDFAILED, "");
    }
}

ReceiveNetworkMessageAsync并且ReadAsync应该处理有关从网络流接收消息的所有事情。

private async Task<NetworkMessageModel> ReceiveNetworkMessageAsync()
{
    var headerbytes = await ReadAsync(4);
    int bodyLength = IPAddress.NetworkToHostOrder(BitConverter.ToInt32(headerbytes, 0));
    var bodybytes = await ReadAsync(bodyLength);
    return new NetworkMessageModel(bodybytes, headerbytes);
}

private async Task<byte[]> ReadAsync(int bytesToRead)
{
    var buffer = new byte[bytesToRead];
    var bytesRead = 0;

    while (bytesRead < bytesToRead)
    {
        var bytesReceived = await networkStream.ReadAsync(buffer, bytesRead, (bytesToRead - bytesRead)).ConfigureAwait(false);
        if (bytesReceived == 0)
        {
            throw new Exception("Socket Closed"); //TODO: Handle unexpected Socket closing error
        }
        bytesRead += bytesReceived;
    }
    
    return buffer;
}

这在大多数情况下都可以正常工作。但是,如果从不同的任务同时调用这些方法,则整个设置将失败。例如:我已经实现了一个“连接检查”,它是一个任务,它会定期向服务器发送一条消息并在超时内等待响应。如果我同时收到来自服务器的长消息和我的连接检查任务调用ReceiveNetworkMessageAsync(),则响应将是两个预期答案的混合。

我可以确保对 的NetworkStream调用一个接一个地调用吗?

标签: c#socketstask-parallel-library

解决方案


我假设 TCP/IP 协议没有可用于区分响应的命令/响应标识符。如果协议确实有一个标识符,那么通常的解决方案是拥有一个未完成命令的字典,并在收到响应时通过标识符查找相关的命令。但是如果协议没有标识符,您的代码一次只能发送一个命令,直到收到响应。我假设协议在这个答案的其余部分没有这个标识符。

有几种方法可以处理这个问题。更健壮和复杂的方法是创建一个服务员队列。对更高级别 API 的每次调用都会将一个服务员排入该队列。服务员将包含一个TaskCompletionSource<T>最终用于完成从高级 API 返回的任务。然后,您将有一个单独的“运行器”任务,它只处理该队列,执行写入,然后读取,然后完成该TaskCompletionSource<T>.

可行,但有点参与。

更简单的方法也可能适用于您的需要。您的每个更高级别的 API 调用只需要锁定套接字,直到它的响应到达。然后,当套接字已经忙时,其他命令将排队。SemaphoreSlim是适用于此的异步兼容互斥原语。该实现比构建您自己的队列更简单,但您也会丢失一些不错的行为,例如 FIFO(或优先请求,如果需要)。

更简单的互斥方法如下所示:

private readonly SemaphoreSlim _mutex = new SemaphoreSlim(1);
public async Task<NetworkCommandModel> SendNetworkMessageAsync(NetworkMessageModel msg, bool awaitAnswer, int timeout = 500)
{
  await _mutex.WaitAsync();
  try
  {
    ... // existing code
  }
  finally
  {
    _mutex.Release();
  }
}

其他注意事项:

  • 不要丢弃任务。应该await编辑写入。至少,这可以让您看到套接字错误。
  • 发生超时时,套接字将处于未知状态。它仍然是一个可行的连接吗?它是否得到了最后一个命令,所以下一个响应将是超时命令?您的代码无法知道这些问题的答案,因此对超时的唯一正确响应是关闭套接字并重新连接。

推荐阅读