c# - GroupJoin,一个窗口在流完成时结束
问题描述
我有两个数据源,它们从单独的线程转储数据。我正在尝试通过一个键加入这两个来源。我可以使用 GroupJoin 来做到这一点。我必须使用 Observable.Never 以便窗口永远不会结束。在加入流方面一切正常。当两个来源都完成转储数据时,他们在 Observers 上调用各自的 onComplete。我希望在收到两个源的 OnComplete 后立即结束流式传输。因为我使用过 Observable.Never 流永远不会结束,并且永远不会调用我的 aggSource 的 Oncomplete 事件。
当两个源都收到 OnComplete 时,有没有办法告诉 Rx 关闭窗口,而不是无限保持打开状态?
我是 Rx 的新手,不确定这是否可以实现。下面是代码片段。提前致谢!!
var l = Source1;
var r = Source2;
var q = r.GroupJoin(l,
_ => Observable.Never<Unit>(), // windows from each left event going on forever
_ => Observable.Never<Unit>(), // windows from each right event going on forever
(left, obsOfRight) => Tuple.Create(left, obsOfRight)); // create tuple of left event with observable of right events
var joinSource = q.SelectMany(e => {
return e.Item2.Where(
x =>
{
return x.ID== e.Item1.ID;
})
.Select(v=> (Item1:v.Value, Item2: e.Item1.Value));
});
var aggSource = joinSource.GroupBy(x => x.Item1).SelectMany(grp =>
{
return grp.Scan(0.0, (accumulator, current) => accumulator + current.Item2).Select(z => (Group: grp.Key, Value: z));
});
aggSource.Subscribe(x => dictResults[x.Group] = x,
y => { Console.WriteLine("Error Ocurred: " + y.Message); completed = true; },
() => { completed = true; Console.WriteLine("Subcription comnpleted"); }
);
// dict results is dictionary which is my projection which is shown to View. Right now my view is just console window.
解决方案
持续时间选择器控制连接窗口重叠——当任一源完成时,我们需要将其缩短。首先,我们将用于LastOrDefaultAsync
在发出 an 时获取通知OnComplete
。
var either = Observable.CombineLatest(l.LastOrDefaultAsync(), r.LastOrDefaultAsync());
现在我们可以从示例中修改持续时间选择器:
var q = r.GroupJoin(l,
_ => Observable.Never<Unit>().TakeUntil(either), // windows from each left event until l or r completes
_ => Observable.Never<Unit>().TakeUntil(either), // windows from each right event until l or r completes
(left, obsOfRight) => Tuple.Create(left, obsOfRight)); // create tuple of left event with observable of right events
这应该会导致级联并完成可观察管道的其余部分。
推荐阅读
- css - Nginx 不会提供静态文件
- drupal-8 - Drupal 8 使用产品类型和分类术语查看上下文过滤器
- android - 启动画面尝试仅显示黑屏
- swift - 连续的单元测试失败,尽管它们单独成功
- javascript - 将页脚粘贴到页面底部
- angular - rxjs:使用 rxjs 自定义运算符更改 HttpRequest
- security - 验证 Tuxedo 服务身份验证和加密
- android - 在 Android Auto 中向服务请求权限
- java - 读取带有注释的属性文件时出现 Vert.x-Config 异常
- javascript - 如果原始空白,Javascript推送到数组添加逗号