首页 > 解决方案 > 如何使用 IObservable分割一个 IObservable进入 IObservable长短不一

问题描述

我有一个“值” IObservable<T>,它返回的T元素必须按顺序组合成可变长度数组,我有一个“控件” IObservable<int>,它告诉我下一个数组必须有多长。删除一个元素、重复它或让结果乱序会使结果变得毫无意义。

这是针对我在 Rx.NET 中重写的串行连接机器人项目。

IObservable<char> values = new [] {'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H' }.ToObservable();
IObservable<int> control = new [] { 1, 4, 2 }.ToObservable();
IObservable<char[]> result = control.SelectMany(length => values.Take(length).ToArray());

我想看到这样的东西:

values  ----A--B--C--D--E--F--G--H-->
control --1-----4---------------2--->
result  ---[A]---------[BCDE]--[FG]->

但到目前为止我的尝试导致

-[A]-[AB]-[ABCD]->

标签: c#rx.net

解决方案


好的,这是满足我所有需求的代码。Progman,您根据您的建议帮助完成了这项工作。在这里,它被整齐地包装起来Observable.Create并变成了一个扩展方法 on IObservable<T>,带有一个一次性处理压缩序列上的订阅。

    public static IObservable<T[]> Chop<T>(this IObservable<T> values, IObservable<int> control) =>
        Observable.Create<T[]>(observer => 
        {
            List<T> buffer = new List<T>();
            return values.Zip(control.SelectMany(length => Enumerable.Repeat(length, length)), 
                              (value, length) => (value, length))
                         .Subscribe(next => 
                         {
                             buffer.Add(next.value);
                             if (buffer.Count == next.length)
                             {
                                 observer.OnNext(buffer.ToArray());
                                 buffer.Clear();
                             }
                         });
        });

样本输出:

values  ----A--B--C--D--E--F--G--H--I--J--K--L--M--N--O--P-->
control --1-4-2-0-3-3--------------------------------------->
result  ---[A]---------[BCDE]-[FG]----[HIJ]----[KLM]-------->

推荐阅读