首页 > 解决方案 > 具有可变计数的 Rx 缓冲

问题描述

我有一个IObservable<byte>我想拆分成IObservable<byte[]>的,大概是使用 Buffer()、Window()、Scan() 等的组合。

不过,我很难找到合适的 Rx 函数组合来处理我的具体情况。大多数关于该主题的 Q/A 都以答案结尾,提到您可以通过测试以查看项目(字节/字符)是否是分隔符,并以这种方式分解缓冲区。我的问题是它不仅仅是一个字节作为分隔符。在我的情况下,我正在读取 4 个字节的长度,然后我想从以下数据中缓冲该数量作为返回字节 []。

我尝试的一种方法是我可以制作一个IObservable<int>表示数据包长度的 a,其他用户可以使用它来将内容分解为缓冲的字节 []。也许是这样的:

IObservable<int> lengthsObservable = byteObservable
    .Buffer(4)
    .Select((b) => BitConverter.ToInt32(b.ToArray(), 0))
    ...

但这有一个问题是我不确定如何插入逻辑以在 int 转换后跳过数据。如何缓冲 4,转换为 int,跳过该数量,然后重复缓冲区 (4),然后继续?

我一直在尝试用 API 编写一些解决方案,但没有真正的运气。我的核心选择是使用非常自定义的累加器类创建一个非常繁琐的 Scan() 调用,但我觉得有更好的更洁净的方式。

TLDR Rx 资深人士是否知道 Buffer() 的常见组合模式,其分隔符不仅仅是一个单元?

编辑:问题的一个更具体的例子是将 anIObservable<byte>与输出分开:

0B 00 00 00 48 65 6C 6C 6F 20 57 6F 72 6C 64 12 00 00 00 54 68 61 6E 6B 73 20 66 6F 72 20 68 65 6C 70 69 6E 67

并将其处理成IObservable<byte[]>具有两个数组输出的:

48 65 6C 6C 6F 20 57 6F 72 6C 64 // Hello world

54 68 61 6E 6B 73 20 66 6F 72 20 68 65 6C 70 69 6E 67 // Thanks for helping

初始0B 00 00 00值是后面的字节块的长度。紧跟在它们之后的是另一个长度12 00 00 00,带有另一个字节块。

标签: reactive-programmingsystem.reactivereactive

解决方案


.Scan()这是一个“带有非常自定义的累加器类的繁琐调用”的示例。我不认为这有那么可怕。至少它很好地抽象:

class AccumulatorState
{
    public AccumulatorState()
        : this(4, 0, true, new byte[0])
    {}

    public AccumulatorState(int bytesToConsume, int bytesConsumed, bool isHeaderState, IEnumerable<byte> accumulatedBytes)
    {
        this.TotalBytesToConsume = bytesToConsume;
        this.BytesConsumed = bytesConsumed;
        this.IsHeaderState = isHeaderState;
        this.AccumulatedBytes = accumulatedBytes;
    }

    public int TotalBytesToConsume { get; }
    public int BytesConsumed { get; }
    public bool IsHeaderState { get; }
    public bool IsComplete => BytesConsumed == TotalBytesToConsume;
    public IEnumerable<byte> AccumulatedBytes { get; }

    public AccumulatorState ConsumeByteToNewState(byte b)
    {
        if(IsComplete)
        {
            if (IsHeaderState)
                return new AccumulatorState(ConvertHeaderToByteCount(AccumulatedBytes), 1, false, new[] { b });
            else
                return new AccumulatorState(4, 1, true, new[] { b });
        }

        return new AccumulatorState(TotalBytesToConsume, BytesConsumed + 1, IsHeaderState, AccumulatedBytes.Concat(new[] { b }));
    }

    private int ConvertHeaderToByteCount(IEnumerable<byte> bytes)
    {
        return bytes
            .Select(b => (int)b)
            .Reverse()
            .Aggregate(0, (total, i) => total * 16 + i);
    }
}

然后你可以这样称呼它:

var bytes = new byte[] { 0x0B, 0x00, 0x00, 0x00, 0x48, 0x65, 0x6C, 0x6C, 0x6F, 0x20, 0x57, 
                         0x6F, 0x72, 0x6C, 0x64, 0x12, 0x00, 0x00, 0x00, 0x54, 0x68, 0x61, 
                         0x6E, 0x6B, 0x73, 0x20, 0x66, 0x6F, 0x72, 0x20, 0x68, 0x65, 0x6C,
                         0x70, 0x69, 0x6E, 0x67 };
bytes.ToObservable()
    .Scan (new AccumulatorState(), (state, b) => state.ConsumeByteToNewState(b))
    .Where(s => s.IsComplete && !s.IsHeaderState)
    .Select(s => s.AccumulatedBytes.ToArray())
    .Dump(); //Linqpad. Switch to a .Subscribe(a => {}) if you're using something else.

这输出IObservable<byte[]>带有两个输出:

48 65 6C 6C 6F 20 57 6F 72 6C 64 
54 68 61 6E 6B 73 20 66 6F 72 20 68 65 6C 70 69 6E 67 

推荐阅读