c# - Rx.NET“门”操作员
问题描述
[注意:如果这很重要,我正在使用 3.1。另外,我在 codereview 上问过这个问题,但到目前为止没有任何回应。]
我需要一个运算符来允许布尔值流充当另一个流的门(当门流为真时让值通过,当门流为假时丢弃它们)。我通常会为此使用 Switch,但如果源流很冷,它将继续重新创建它,这是我不想要的。
我也想自己清理一下,以便在源或门完成时结果完成。
public static IObservable<T> When<T>(this IObservable<T> source, IObservable<bool> gate)
{
var s = source.Publish().RefCount();
var g = gate.Publish().RefCount();
var sourceCompleted = s.TakeLast(1).DefaultIfEmpty().Select(_ => Unit.Default);
var gateCompleted = g.TakeLast(1).DefaultIfEmpty().Select(_ => Unit.Default);
var anyCompleted = Observable.Amb(sourceCompleted, gateCompleted);
var flag = false;
g.TakeUntil(anyCompleted).Subscribe(value => flag = value);
return s.Where(_ => flag).TakeUntil(anyCompleted);
}
除了整体冗长之外,我不喜欢订阅门,即使结果从未订阅(在这种情况下,此运算符应该是无操作的)。有没有办法摆脱那个订阅?
我也尝试过这种实现,但在自行清理时更糟糕:
return Observable.Create<T>(
o =>
{
var flag = false;
gate.Subscribe(value => flag = value);
return source.Subscribe(
value =>
{
if (flag) o.OnNext(value);
});
});
这些是我用来检查实现的测试:
[TestMethod]
public void TestMethod1()
{
var output = new List<int>();
var source = new Subject<int>();
var gate = new Subject<bool>();
var result = source.When(gate);
result.Subscribe(output.Add, () => output.Add(-1));
// the gate starts with false, so the source events are ignored
source.OnNext(1);
source.OnNext(2);
source.OnNext(3);
CollectionAssert.AreEqual(new int[0], output);
// setting the gate to true will let the source events pass
gate.OnNext(true);
source.OnNext(4);
CollectionAssert.AreEqual(new[] { 4 }, output);
source.OnNext(5);
CollectionAssert.AreEqual(new[] { 4, 5 }, output);
// setting the gate to false stops source events from propagating again
gate.OnNext(false);
source.OnNext(6);
source.OnNext(7);
CollectionAssert.AreEqual(new[] { 4, 5 }, output);
// completing the source also completes the result
source.OnCompleted();
CollectionAssert.AreEqual(new[] { 4, 5, -1 }, output);
}
[TestMethod]
public void TestMethod2()
{
// completing the gate also completes the result
var output = new List<int>();
var source = new Subject<int>();
var gate = new Subject<bool>();
var result = source.When(gate);
result.Subscribe(output.Add, () => output.Add(-1));
gate.OnCompleted();
CollectionAssert.AreEqual(new[] { -1 }, output);
}
解决方案
这有效:
public static IObservable<T> When<T>(this IObservable<T> source, IObservable<bool> gate)
{
return
source.Publish(ss => gate.Publish(gs =>
gs
.Select(g => g ? ss : ss.IgnoreElements())
.Switch()
.TakeUntil(Observable.Amb(
ss.Select(s => true).Materialize().LastAsync(),
gs.Materialize().LastAsync()))));
}
这通过了两项测试。
推荐阅读
- javascript - 是否可以控制 HTML Razor 组件的渲染顺序?
- r - 使用 R 遍历来自 csv 的 URL 列表,打开 URL,并评估这些站点是否包含某个文本字符串
- groovy - 用户一直在说“十二三十四”而不是“一二三四”
- react-native - componentWillReceiveProps 被调用不在当前屏幕反应原生
- python - NameError 提示玩家输入猜测后
- sql - 更新语句在预填充的列上生成空违规
- sql - SQL根据参数进行不同的查询?
- c# - 尝试使用 .Net Core Framework 3.0 让 .Net Boxed Framework 工作
- c# - Xamarin Android:为什么对 Azure Active Directory 的身份验证不起作用
- python - 是否可以将 QInputDialog 与嵌套列表或类似的东西一起使用?