首页 > 解决方案 > 重新组合来自同一反应流的元素

问题描述

我想要实现的可以描述如下:

这个概念看起来像这样:

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));

var s = Observable.Interval(TimeSpan.FromMilliseconds(100)).Publish().AutoConnect();

var s1 = s.Where(x => x % 5 == 0);
var s2 = s.Sample(TimeSpan.FromMilliseconds(1000));

new[] {s1, s2}.Merge()./*Distinct().*/Subscribe(Console.WriteLine, cts.Token);

await Task.Delay(Timeout.InfiniteTimeSpan, cts.Token).ContinueWith(_=>_, TaskContinuationOptions.OnlyOnCanceled);

原始来源很热。如果没有Distinct我显然会得到重复的值,它看起来会产生我期望看到的东西。

有没有更好的方法,事实上,第一个派生流不是周期性的?

标签: c#system.reactiverx.net

解决方案


您可以在源 observable 中附加索引,然后DistinctUntilChanged在最终合并的 observable 中应用。

var withIndex = s.Select((x, i) => (Item : x, Index : i));
var s1 = withIndex.Where(p => p.Item % 5 == 0);
var s2 = withIndex.Sample(TimeSpan.FromMilliseconds(1000));

new[] { s1, s2 }
    .Merge()
    .DistinctUntilChanged(p => p.Index) // discard duplicates
    .Select(p => p.Item) // discard the index
    .Subscribe(Console.WriteLine, cts.Token);

我猜操作符DistinctUntilChanged比 更轻量级Distinct,因为它只缓存最新的元素。


推荐阅读