首页 > 解决方案 > 如何从多个线程写入 TcpListener?

问题描述

假设我有一个静态列表List<string> dataQueue,其中数据不断以随机间隔添加,并且以不同的速率(1-1000 个条目/秒)

我的主要目标是将列表中的数据发送到服务器,我正在使用一个TcpClient类。

到目前为止我所做的是,我在单线程中将数据同步发送到客户端

byte[] bytes = Encoding.ASCII.GetBytes(message);

tcpClient.GetStream().Write(bytes, 0, bytes.Length);
//The client is already connected at the start

一旦发送数据,我就会从列表中删除该条目。

这工作正常,但是发送数据的速度不够快,列表被填充并消耗更多内存,因为列表被迭代并被一一发送。

我的问题是我可以使用同一个tcpClient对象从另一个线程同时写入,还是可以使用另一个tcpClient对象与另一个线程中的同一服务器建立新连接?将此数据发送到服务器的最有效(最快)方式是什么?

PS:我不想使用UDP

标签: c#tcpclient

解决方案


正确的; 这是一个有趣的话题,我想我可以发表意见。听起来您在多个线程之间共享一个套接字-只要您非常小心地进行操作,就完全有效。TCP 套接字是字节的逻辑流,因此您不能同时使用它,但是如果您的代码足够快,则可以非常有效地共享套接字,每条消息都是连续的。

可能首先要看的是:您实际上是如何将数据写入套接字的?你的框架/编码代码是什么样的?如果这段代码很糟糕/效率低下:它可能会被改进。例如,它是否通过天真的调用间接创建了一个新的byte[]per ?是否涉及多个缓冲区?是否在取景时多次调用?它是如何解决数据包碎片问题的?ETCstringEncodeSend

作为尝试的第一件事 - 您可以避免一些缓冲区分配:

var enc = Encoding.ASCII;
byte[] bytes = ArrayPool<byte>.Shared.Rent(enc.GetMaxByteCount(message.Length));
// note: leased buffers can be oversized; and in general, GetMaxByteCount will
// also be oversized; so it is *very* important to track how many bytes you've used
int byteCount = enc.GetBytes(message, 0, message.Length, bytes, 0);
tcpClient.GetStream().Write(bytes, 0, byteCount);
ArrayPool<byte>.Shared.Return(bytes);

byte[]这使用了一个租用的缓冲区来避免每次都创建一个缓冲区——这可以大大提高 GC 的影响。如果是我,我也可能会使用 rawSocket而不是TcpClientandStream抽象,坦率地说,这不会给你带来很多好处。注意:如果您有其他框架要做:包括在您租用的缓冲区的大小中,在写入每个片段时使用适当的偏移量,并且只写入一次- 即准备整个缓冲区一次 - 避免多次调用Send.


现在,听起来你有一个队列和专门的作家;即您的应用程序代码附加到队列中,您的编写代码将事物出列并将它们写入套接字。这是一种合理的实现方式,尽管我会添加一些注释:

  • List<T>实现队列是一种糟糕的方式——从一开始就删除东西需要重新洗牌(这很昂贵);如果可能的话,更喜欢Queue<T>,这是为您的场景完美实现的
  • 它将需要同步,这意味着您需要确保一次只有一个线程更改队列 - 这通常通过简单的lock, 即lock(queue) {queue.Enqueue(newItem);}SomeItem next; lock(queue) { next = queue.Count == 0 ? null : queue.Dequeue(); } if (next != null) {...write it...}.

这种方法很简单,并且在避免数据包碎片方面具有一些优势——写入器可以使用暂存缓冲区,并且仅在缓冲某个阈值或队列为空时才实际写入套接字——但它发生停顿时有可能造成大量积压。

然而!发生积压的事实表明有些事情没有跟上;这可能是网络(带宽)、远程服务器(CPU)——或者可能是本地出站网络硬件。如果这只发生在小问题中,然后自行解决 - 很好(特别是当一些出站消息很大时发生),但是:一个值得关注的问题。

如果这种积压问题反复出现,那么坦率地说,您需要考虑到您对当前设计已经饱和,因此您需要解除其中一个瓶颈:

  • 确保您的编码代码有效是步骤零
  • 您可以将编码步骤移动到应用程序代码中,即在获取锁定之前准备一个帧,对消息进行编码,并且只将一个完全准备好的帧加入队列;这意味着编写器线程除了出列、写入、回收之外不需要做任何事情——但它使缓冲区管理更加复杂(显然,在缓冲区完全处理之前你不能回收缓冲区)
  • 如果您还没有采取措施来实现这一目标,那么减少数据包碎片可能会有很大帮助
  • 否则,您可能需要(在调查阻塞后):
    • 更好的本地网络硬件(NIC)或物理机器硬件(CPU等)
    • 多个套接字(和队列/工作者)之间循环,分配负载
    • 可能是多个服务器进程,每个服务器都有一个端口,因此您的多个套接字正在与不同的进程通信
    • 更好的服务器
    • 多台服务器

注意:在任何涉及多个套接字的场景中,你要小心不要发疯,并且有太多的专用工作线程;如果该数字超过 10 个线程,您可能需要考虑其他选项 - 可能涉及异步 IO 和/或管道(如下)。


为了完整起见,另一种基本方法是从 app-code 编写;这种方法更简单,并且避免了未发送工作的积压,但是:这意味着现在您的应用程序代码线程本身将在负载下备份。如果您的应用程序代码线程实际上是工作线程,并且它们在 sync/ 上被阻塞lock,那么这可能非常糟糕;您不想使线程池饱和,因为您最终可能会遇到没有线程池线程可用于满足解锁任何处于活动状态的写入器所需的 IO 工作的情况,这可以让您真正进入问题。这通常不是您想要用于高负载/高容量的方案,因为它很快就会出现问题 - 并且很难避免数据包碎片,因为每个单独的消息都无法知道是否会有更多消息进入.


最近要考虑的另一个选择是“管道”;这是 .NET 中的一个新 IO 框架,专为大容量网络而设计,特别关注异步 IO、缓冲区重用以及实现良好的缓冲区/积压日志机制,使得使用简单的writer 方法(在写入时同步)并且不会将其转换为直接发送- 它表现为一个可以访问积压的异步写入器,这使得避免数据包碎片化变得简单而高效。这是一个相当先进的领域,但它可能非常有效。对您来说有问题的部分是:它是为整个异步使用而设计的,甚至用于写入 - 因此,如果您的应用程序代码当前是同步的,那么实现起来可能会很痛苦。但是:这是一个需要考虑的领域。我有很多关于这个主题的博客文章,以及一系列使用管道的 OSS 示例和真实库,我可以指出这些,但是:这不是“快速修复”——它是彻底改革整个 IO 层。它也不是灵丹妙药——它只能消除由于本地 IO 处理成本而产生的开销。


推荐阅读