首页 > 解决方案 > 遇到特定值时暂停流

问题描述

我有一个流,每当遇到某个值时我想暂停/停留。最优雅的方法是什么?

想象一下幻数是 4 - 这将阻止在 x 时间内发出任何其他值。如果它们在暂停期间发生,它还将防止发出“陈旧”值,但在暂停之后,它将切换到最近的值

Source +-1-2-3-4-5-6-7-8-9
Pause          +----x
Output +-1-2-3-4----67-8-9

标签: c#system.reactive

解决方案


您可以使用CombineLatest()将源与另一个处理暂停状态的可观察对象结合起来。合并后使用一个简单Where()的过滤当前暂停的条目。请参阅以下源代码:

ISubject<bool> pause = new BehaviorSubject<bool>(false);

IObservable<long> original = Observable.Interval(TimeSpan.FromSeconds(1));

IObservable<long> controlled = original.CombineLatest(pause, (o, p) => {
    return new { o, p};
})
    .Where(it => !it.p)
    .Select(it => it.o);

Stopwatch sw = Stopwatch.StartNew();
controlled.Subscribe(it => {
   Console.WriteLine($"{sw.ElapsedMilliseconds} - {it}");
});

Thread.Sleep((int)(3.1 * 1000)); // wait for 3.1 seconds
pause.OnNext(true); // pause it 
Thread.Sleep((int)(3.5 * 1000)); // wait for 3.5 seconds
pause.OnNext(false);  // resume
Thread.Sleep(5 * 1000); // and finally wait for 5 seconds

这将生成以下输出:

1017 - 0
2009 - 1
3007 - 2
6608 - 5
7007 - 6
8007 - 7
9007 - 8
10007 - 9
11007 - 10

如您所见,值34最初5被阻止/忽略,但是当暂停标志恢复5时,一旦暂停标志被重置(6.6 秒后),最后一个值就会再次“重新发送”。之后发出正常的间隔值。


推荐阅读