reactive-programming - 具有可变计数的 Rx 缓冲
问题描述
我有一个IObservable<byte>
我想拆分成IObservable<byte[]>
的,大概是使用 Buffer()、Window()、Scan() 等的组合。
不过,我很难找到合适的 Rx 函数组合来处理我的具体情况。大多数关于该主题的 Q/A 都以答案结尾,提到您可以通过测试以查看项目(字节/字符)是否是分隔符,并以这种方式分解缓冲区。我的问题是它不仅仅是一个字节作为分隔符。在我的情况下,我正在读取 4 个字节的长度,然后我想从以下数据中缓冲该数量作为返回字节 []。
我尝试的一种方法是我可以制作一个IObservable<int>
表示数据包长度的 a,其他用户可以使用它来将内容分解为缓冲的字节 []。也许是这样的:
IObservable<int> lengthsObservable = byteObservable
.Buffer(4)
.Select((b) => BitConverter.ToInt32(b.ToArray(), 0))
...
但这有一个问题是我不确定如何插入逻辑以在 int 转换后跳过数据。如何缓冲 4,转换为 int,跳过该数量,然后重复缓冲区 (4),然后继续?
我一直在尝试用 API 编写一些解决方案,但没有真正的运气。我的核心选择是使用非常自定义的累加器类创建一个非常繁琐的 Scan() 调用,但我觉得有更好的更洁净的方式。
TLDR Rx 资深人士是否知道 Buffer() 的常见组合模式,其分隔符不仅仅是一个单元?
编辑:问题的一个更具体的例子是将 anIObservable<byte>
与输出分开:
0B 00 00 00 48 65 6C 6C 6F 20 57 6F 72 6C 64 12 00 00 00 54 68 61 6E 6B 73 20 66 6F 72 20 68 65 6C 70 69 6E 67
并将其处理成IObservable<byte[]>
具有两个数组输出的:
48 65 6C 6C 6F 20 57 6F 72 6C 64 // Hello world
和
54 68 61 6E 6B 73 20 66 6F 72 20 68 65 6C 70 69 6E 67 // Thanks for helping
初始0B 00 00 00
值是后面的字节块的长度。紧跟在它们之后的是另一个长度12 00 00 00
,带有另一个字节块。
解决方案
.Scan()
这是一个“带有非常自定义的累加器类的繁琐调用”的示例。我不认为这有那么可怕。至少它很好地抽象:
class AccumulatorState
{
public AccumulatorState()
: this(4, 0, true, new byte[0])
{}
public AccumulatorState(int bytesToConsume, int bytesConsumed, bool isHeaderState, IEnumerable<byte> accumulatedBytes)
{
this.TotalBytesToConsume = bytesToConsume;
this.BytesConsumed = bytesConsumed;
this.IsHeaderState = isHeaderState;
this.AccumulatedBytes = accumulatedBytes;
}
public int TotalBytesToConsume { get; }
public int BytesConsumed { get; }
public bool IsHeaderState { get; }
public bool IsComplete => BytesConsumed == TotalBytesToConsume;
public IEnumerable<byte> AccumulatedBytes { get; }
public AccumulatorState ConsumeByteToNewState(byte b)
{
if(IsComplete)
{
if (IsHeaderState)
return new AccumulatorState(ConvertHeaderToByteCount(AccumulatedBytes), 1, false, new[] { b });
else
return new AccumulatorState(4, 1, true, new[] { b });
}
return new AccumulatorState(TotalBytesToConsume, BytesConsumed + 1, IsHeaderState, AccumulatedBytes.Concat(new[] { b }));
}
private int ConvertHeaderToByteCount(IEnumerable<byte> bytes)
{
return bytes
.Select(b => (int)b)
.Reverse()
.Aggregate(0, (total, i) => total * 16 + i);
}
}
然后你可以这样称呼它:
var bytes = new byte[] { 0x0B, 0x00, 0x00, 0x00, 0x48, 0x65, 0x6C, 0x6C, 0x6F, 0x20, 0x57,
0x6F, 0x72, 0x6C, 0x64, 0x12, 0x00, 0x00, 0x00, 0x54, 0x68, 0x61,
0x6E, 0x6B, 0x73, 0x20, 0x66, 0x6F, 0x72, 0x20, 0x68, 0x65, 0x6C,
0x70, 0x69, 0x6E, 0x67 };
bytes.ToObservable()
.Scan (new AccumulatorState(), (state, b) => state.ConsumeByteToNewState(b))
.Where(s => s.IsComplete && !s.IsHeaderState)
.Select(s => s.AccumulatedBytes.ToArray())
.Dump(); //Linqpad. Switch to a .Subscribe(a => {}) if you're using something else.
这输出IObservable<byte[]>
带有两个输出:
48 65 6C 6C 6F 20 57 6F 72 6C 64
54 68 61 6E 6B 73 20 66 6F 72 20 68 65 6C 70 69 6E 67
推荐阅读
- docker - Docker 桌面联网 Windows 和 Linux 节点
- bash - 无法使用 Bash 脚本 JQ 遍历值中有空格的 JSON 内部数组
- javascript - TypeError:未定义不是对象(评估“this”)
- c# - 如何拆分字符串并将其插入 DataTemplate 中的 Binding
- python-3.x - Python:使用 Numba 签名
- ethereum - 广播离线交易
- firebase-authentication - 如何使用存储在 Firebase 身份验证模拟器中的用户在 Functions Emulator 中验证 Firebase 云功能?
- r - 使用栅格中的多个函数提取值
- bash - 根据目录中的最新文件名替换文本文件中的参数
- javascript - 范围值需要正则表达式