rsocket - 第 3 个消息帧的 RSocket Net,pipereader.ReadAsync 被截断,导致 MessageFramePeek 方法出错
问题描述
我正在使用 RSocket.Net 作为 Java Spring Boot RSocket 服务器的网络客户端。服务器将报价消息发送回客户端。示例代码一直有效,直到第三条报价消息被截断。
static public async Task Handler(IRSocketProtocol sink, PipeReader pipereader, CancellationToken cancellation)
{
Logs.Enter();
//The original implementation was a state-machine parser with resumability. It doesn't seem like the other implementations follow this pattern and the .NET folks are still figuring this out too - see the internal JSON parser discussion for how they're handling state machine persistence across async boundaries when servicing a Pipeline. So, this version of the handler only processes complete messages at some cost to memory buffer scalability.
//Note that this means that the Pipeline must be configured to have enough buffering for a complete message before source-quenching. This also means that the downstream consumers don't really have to support resumption, so the interface no longer has the partial buffer methods in it.
while (!cancellation.IsCancellationRequested)
{
var read = await pipereader.ReadAsync(cancellation);
var buffer = read.Buffer;
第一条消息是这样的,缓冲区大小为 942
下一条消息类似。那么第三条消息要么是 5657 的尺寸太大了!!?
或者它在 165 时太小了
这会导致MessageFramePeek
方法中的错误
static public (int Length, bool IsEndOfMessage) MessageFramePeek(ReadOnlySequence<byte> sequence) { var reader = new SequenceReader<byte>(sequence); return reader.TryRead(out byte b1) && reader.TryRead(out byte b2) && reader.TryRead(out byte b3) ? ((b1 << 8 * 2) | (b2 << 8 * 1) | (b3 << 8 * 0), true) : (0, false); }
更新我检查了服务器和客户端上的编码类型是 JSON,以便字符串不会与线路上的对象混合。仍然存在同样的问题。
更新如果 RSocket.Core 建立在网络标准或网络框架上,则会发生同样的问题。
解决方案
推荐阅读
- yugabyte-db - 无法访问 yugabyte tserver 和 master admin ui
- python - 需要使用python删除列表中具有某些分隔字符的单词
- c# - 自包含,服务要求安装运行时
- javascript - 无法显示二级数组 JSON
- r - 启动时出现 R 错误:评估嵌套太深
- windows - 如何确定在经典 Windows 应用程序中如何调整窗口大小
- sql-server - SQL Server 连接失败 - Debezium - Kafka Connect
- java - 如何在电报机器人中看到来自 Java Web 应用程序的传入 POST 请求
- swift - 链接后无法使用 IBOutlets
- java - 从 Java 调用 Kotlin 时如何获取错误的行号?