首页 > 解决方案 > .NET Socket ReadAsync 在写循环 Async / Await 期间被阻塞

问题描述

我正在使用 Async / Await 编写一个 TCP 服务器,该服务器需要根据从每个客户端接收到的内容向连接的客户端发送消息列表。在发送给客户端的每条消息之间,我需要:

  1. 等待确认/响应,然后发送下一条消息
  2. 如果 5 秒后没有确认,则重新发送命令

为此,ResponseReceived当预期响应到来时,我在我的 ConnClient 类上设置一个属性。然后,在ConnClient.SendListAsync例程中,我检查发送每个命令后该属性是否已更改。但是,在发送所有消息之前不会读取传入的响应SendListAsync,如下面的调试语句所示:

Sending Initial Message.
Received response, generate list of 3 initial commands and send them.
SendListAsync 5 second timeout w/o response.
SendListAsync 5 second timeout w/o response.
SendListAsync 5 second timeout w/o response.
Received response.
Received response.
Received response.

问题:如何正确防止ConnClient.SendListAsync阻塞传入读取?

public class Svr
{
    TcpListener listener;
    public async Task Listen(IPAddress iP, int port)
    {
        listener = new TcpListener(iP, port);
        listener.Start();
        while (true)
        {
            TcpClient client = await listener.AcceptTcpClientAsync();
            ConnClient cc = new ConnClient(client);
            await Receive(ConnClient);
        }
    }

    async Task Receive(ConnClient cc)
    {
        var headerSize = sizeof(short);
        byte[] buffer = new byte[4000];

        //Send initial msg
        await cc.socket.GetStream().WriteAsync(Strings.InitialMsg, 0, Strings.InitialMsg.Length); 

        while (true)
        {
            buffer = new byte[headerSize];
            if (!await ReadToBuffer(cc.socket.GetStream(), buffer, headerSize))
                return;

            var length = BitConverter.ToUInt16(new byte[2] { buffer[1], buffer[0] }, 0 );
            buffer = new byte[length];

            if (!await ReadToBuffer(cc.socket.GetStream(), buffer, length))
                return;

            await DoSomethingBasedOnReceived(messageBuffer, cc);
        }
    }

    async Task<Boolean> ReadToBuffer(NetworkStream stream, byte[] buffer, int bytesToRead)
    {
        int offset = 0;
        while (offset < bytesToRead)
        {
            var length = await stream.ReadAsync(buffer, offset, bytesToRead - offset);
            if (length == 0)
                return false;
            offset += length;
        }
        return true;
    }

    public async Task DoSomethingBasedOnReceived(byte[]  messageBuffer, ConnClient cc)
    {
        await SomeLogicToSetTheRRFlagIfMessageApplicable(messageBuffer, cc);
        List<byte[]> ListOfMessagesToSend = SomeLogicToDetermineListOfMessages(messageBuffer);
        await cc.SendListAsync(ListOfMessagesToSend);
    }
}

ConnClient 类,代表单个连接的客户端。

public class ConnClient
{
    public TcpClient socket { get; set; }
    public Boolean ResponseReceived { get; set; }
    public ConnClient (TcpClient cc)
    {socket = cc}

    public async Task SendListAsync(List<byte[]> messageList)
    {
        foreach (byte[] msg in messageList)
        {
            this.ResponseReceived = false;
            await stream.WriteAsync(msg, 0, msg.Length);

            int waitedSoFar = 0;
            while (waitedSoFar < 5000)
            {
                if (this.ResponseReceived == true)
                {
                    break;
                }
                waitedSoFar += 100;
                await Task.Delay(100);
            }
        }
    }
}

标签: c#socketsasynchronousasync-await

解决方案


您的第一个问题是您将无法接受新客户。

while (true)
{
        // accept the next connection
        TcpClient client = await listener.AcceptTcpClientAsync();

        // receive and send list
        ConnClient cc = new ConnClient(client);
        await Receive(ConnClient);

        // the loop cannot continue to receive the next connection 
        // until you have done with your receive
}

您将需要Receive独立执行,以便等待下一个连接,您可以在没有 an 的情况下调用它await它将作为async void运行),或者将其卸载到新任务。

删除等待

Receive(ConnClient);

卸载

Task.Run(() => Receive(ConnClient));

您的第二个问题是您的客户在发送时被阻止并且无法接收。再一次,您要么卸载,要么在没有等待的情况下运行。

正如@PeterDuniho提到的

鉴于OP已经在使用async/ await,并且 Receive()已经是async,没有理由使用Task.Run(). 无论哪种方式,它 都是一劳永逸的(除非他们更改代码以存储返回的任务),因此他们不妨一劳永逸地调用 to Receive()并将其包装在对Task.Run().

注意:创建可扩展的客户端/服务器套接字解决方案并非易事,我并不是要展示这一点。但是,它将解决您当前的问题。

无论哪种方式,都要非常注意错误。由于两种提议的解决方案都将在未观察到的情况下运行,因此需要处理异常


推荐阅读