首页 > 解决方案 > 处理在不同线程中读取的 TCP Socket 数据

问题描述

我正在开发一个程序来与每 70 毫秒通过 TCP 通道发送数据的设备集成。

我正在使用 Socket.BeginReceive 和 Socket.EndReceive 方法来读取数据。逻辑在以下伪代码中描述

private void OnReceived(IAsyncResult ar)
{
    var rcvdDataLength = m_tcpSocket.EndReceive(ar);
    Array.Copy(m_tempRecvBuffer, 0, m_mainBuffer, m_mainBufferDataIndex, rcvdDataLength);
    if (CheckIfValidHeaderAndBodyReceived())
    {
        var actualData = new byte[headerLen + BodyLen];
        Array.Copy(m_mainBuffer, m_dataIndex, actualData, 0, headerLen + BodyLen);
        Process(actualData);
    }
    m_tcpSocket.BeginReceive(m_tempRecvBuffer, 0, m_tempRecvBuffer.Length, SocketFlags.None,
        OnReceived, null);
}

上面代码中描述的流程函数负责实现业务逻辑。处理功能目前大约需要 300 毫秒。

所以消费者(需要 300 毫秒)比生产者(每 70 毫秒发布一次数据)慢。我是否需要异步运行此 Process 函数以避免延迟?还是TCP 层的流量控制方面负责这个?

标签: c#.netsocketstcpwinsock

解决方案


我是否需要异步运行此 Process 函数以避免延迟?

这取决于您的应用程序以及最终您自己的优先级决定。严格来说,不,您不需要异步执行任何操作,但这可能是件好事。

我通常使用和推荐的方法是一个专用于接口的线程,它通过队列与应用程序的其余部分进行交互。当通信线程接收到消息时,它会锁定一个队列并将它们推入。当主应用程序准备好使用该数据时,它会锁定该队列并尽可能多地退出队列。这是一个简单、健壮且可靠的机制。转到您的伪代码:

private void OnReceived(IAsyncResult ar)
{
    var rcvdDataLength = m_tcpSocket.EndReceive(ar);
    Array.Copy(m_tempRecvBuffer, 0, m_mainBuffer, m_mainBufferDataIndex, rcvdDataLength);
    if (CheckIfValidHeaderAndBodyReceived())
    {
        var actualData = new byte[headerLen + BodyLen];
        Array.Copy(m_mainBuffer, m_dataIndex, actualData, 0, headerLen + BodyLen);
        lock(messageQueue)
        {
           messageQueue.Enqueue(actualData);
        }
    }
    m_tcpSocket.BeginReceive(m_tempRecvBuffer, 0, m_tempRecvBuffer.Length, SocketFlags.None,
        OnReceived, null);
}

然后在你的应用程序的某个地方:

void ProcessQueue()
{
   Queue<byte[]> tempQueue = new Queue<byte[]>();
   lock(messageQueue)
   {
      // Drain the queue so we can release the lock ASAP
      while(messageQueue.Count > 0)
      {
         tempQueue.Enqueue(messageQueue.Dequeue());
      }
   }
   while(tempQueue.Count > 0)
   {
      Process(tempQueue.Dequeue());
   }
}

推荐阅读