首页 > 解决方案 > 如何使用 Kestrel ConnectionHandler 处理传入的 TCP 消息?

问题描述

我想为我的 .NET Core 项目创建一个 TCP 侦听器。我正在使用 Kestrel 并ConnectionHandler为此配置了一个新的

kestrelServerOptions.ListenLocalhost(5000, builder =>
{
    builder.UseConnectionHandler<MyTCPConnectionHandler>();
});

所以我到目前为止是

internal class MyTCPConnectionHandler : ConnectionHandler
{
    public override async Task OnConnectedAsync(ConnectionContext connection)
    {
        IDuplexPipe pipe = connection.Transport;
        PipeReader pipeReader = pipe.Input;

        while (true)
        {
            ReadResult readResult = await pipeReader.ReadAsync();
            ReadOnlySequence<byte> readResultBuffer = readResult.Buffer;

            foreach (ReadOnlyMemory<byte> segment in readResultBuffer)
            {
                // read the current message
                string messageSegment = Encoding.UTF8.GetString(segment.Span);

                // send back an echo
                await pipe.Output.WriteAsync(segment);
            }

            if (readResult.IsCompleted)
            {
                break;
            }

            pipeReader.AdvanceTo(readResultBuffer.Start, readResultBuffer.End);
        }
    }
}

从 TCP 客户端向服务器应用程序发送消息时,代码可以正常工作。这条线await pipe.Output.WriteAsync(segment);现在就像一个回声。

出现了一些问题

标签: c#.net-coretcpkestrel

解决方案


  1. 这完全取决于协议;在很多情况下,你什么都不做也没关系;在其他情况下,如果您只想说“我还在这里”,则会发送特定的“ping”/“pong”帧
  2. “何时”完全取决于协议;waiting forreadResult.IsCompleted表示您正在等待将入站套接字标记为已关闭,这意味着在客户端关闭其出站套接字之前您不会发送任何内容;对于单次协议,这可能很好;但在大多数情况下,您需要查找单个入站帧,并回复该帧(并重复)
  3. 听起来您可能确实在编写一次性通道,即客户端只向服务器发送一件事,然后:服务器只向客户端发送一件事;在这种情况下,您可以执行以下操作:
while (true)
{
    var readResult = await pipeReader.ReadAsync();
    if (readResult.IsCompleted)
    {
        // TODO: not shown; process readResult.Buffer

        // tell the pipe that we consumed everything, and exit
        pipeReader.AdvanceTo(readResultBuffer.End, readResultBuffer.End);
        break;
    }
    else
    {
        // wait for the client to close their outbound; tell
        // the pipe that we couldn't consume anything
        pipeReader.AdvanceTo(readResultBuffer.Start, readResultBuffer.End);
    }

至于:

我是否应该将每个存储messageSegment在 a 中List<string>并将其连接到单个字符串时

这里首先要考虑的是,每个缓冲区段不一定包含确切数量的字符。由于您使用的是多字节编码的 UTF-8,因此一个段可能在开头和结尾包含部分字符,因此:解码它比这更复杂一些。

因此,检查IsSingleSegment缓冲区是很常见的。如果这是真的,您可以使用简单的代码:

if (buffer.IsSingleSegment)
{
    string message = Encoding.UTF8.GetString(s.FirstSpan);
    DoSomethingWith(message);
}
else
{
    // ... more complex
}

不连续的缓冲区情况要困难得多。基本上,您在这里有两个选择:

  1. 将段线性化为一个连续的缓冲区,可能从 租用一个超大的缓冲区ArrayPool<byte>.Shared,并在租用缓冲区UTF8.GetString正确部分上使用
  2. 在编码上使用GetDecoder()API,并使用它来填充新字符串,这在旧框架上意味着覆盖新分配的字符串,或者在新框架中意味着使用string.CreateAPI

坦率地说,“1”要简单得多。例如(未经测试):

public static string GetString(in this ReadOnlySequence<byte> payload,
    Encoding encoding = null)
{
    encoding ??= Encoding.UTF8;
    return payload.IsSingleSegment ? encoding.GetString(payload.FirstSpan)
        : GetStringSlow(payload, encoding);

    static string GetStringSlow(in ReadOnlySequence<byte> payload, Encoding encoding)
    {
        // linearize
        int length = checked((int)payload.Length);
        var oversized = ArrayPool<byte>.Shared.Rent(length);
        try
        {
            payload.CopyTo(oversized);
            return encoding.GetString(oversized, 0, length);
        }
        finally
        {
            ArrayPool<byte>.Shared.Return(oversized);
        }
    }
}

推荐阅读