c# - 重新组合来自同一反应流的元素
问题描述
我想要实现的可以描述如下:
- 我有一个样本流,它们是带时间戳的测量值。这是原始流。
- 我在原始流上应用了一个过滤器,由此我得到了一个派生流(它将是这个问题的滞后过滤器,但为了简单起见,我在
Where
这里使用一个运算符) - 为了消除缓慢变化的值导致的巨大差距,我将一个
Sample
运算符应用于原始流 - 我将两个流合并到一个结果流中
这个概念看起来像这样:
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var s = Observable.Interval(TimeSpan.FromMilliseconds(100)).Publish().AutoConnect();
var s1 = s.Where(x => x % 5 == 0);
var s2 = s.Sample(TimeSpan.FromMilliseconds(1000));
new[] {s1, s2}.Merge()./*Distinct().*/Subscribe(Console.WriteLine, cts.Token);
await Task.Delay(Timeout.InfiniteTimeSpan, cts.Token).ContinueWith(_=>_, TaskContinuationOptions.OnlyOnCanceled);
原始来源很热。如果没有Distinct
我显然会得到重复的值,它看起来会产生我期望看到的东西。
有没有更好的方法,事实上,第一个派生流不是周期性的?
解决方案
您可以在源 observable 中附加索引,然后DistinctUntilChanged
在最终合并的 observable 中应用。
var withIndex = s.Select((x, i) => (Item : x, Index : i));
var s1 = withIndex.Where(p => p.Item % 5 == 0);
var s2 = withIndex.Sample(TimeSpan.FromMilliseconds(1000));
new[] { s1, s2 }
.Merge()
.DistinctUntilChanged(p => p.Index) // discard duplicates
.Select(p => p.Item) // discard the index
.Subscribe(Console.WriteLine, cts.Token);
我猜操作符DistinctUntilChanged
比 更轻量级Distinct
,因为它只缓存最新的元素。
推荐阅读
- eclipse - PHP 8 脚本在带有 Eclipse 2020-06 的 Chrome 上运行
- flutter - Flutter - 断言失败:assetName!= null 不正确
- mysql - SQL 语法问题:如何获取前 4 个字母和大写字母
- python-3.x - python中相互依赖的静态方法
- html - 引导行中的垂直对齐
- reverse-engineering - 使用 dis 反汇编操作码
- shell - 如何使用 curl 从 cloudflare 下载文件?
- sql - 我想将 t-sql 中的 json 行组合成单个 json 行
- bash - Visual Studio Code 任务没有环境变量 SEt
- javascript - 如何使用 Firebase 9.x + Flow