system.reactive - 拆分 IObservable进入 IObservable>
问题描述
我有一个源流,它有时会发出某个标记值来指定新流的开始。我想将我的流转换为IObservable<IObservable<T>>
. 谁能想到一个优雅的方式?
解决方案
这应该可以解决问题:
observable = observable
.Publish()
.RefCount();
var splitted = observable
.Window(observable.Where(x => x == SENTINEL))
.Select(c => c.Where(x => x != SENTINEL));
完整示例:
const int SENTINEL = -1;
var observable = Observable
.Interval(TimeSpan.FromMilliseconds(100))
.Select(x => x + 1)
.Take(12)
.Select(x => x % 5 == 0 ? SENTINEL : x) // Every fifth is a sentinel
.Publish()
.RefCount();
observable
.Window(observable.Where(x => x == SENTINEL))
.Select(c => c.Where(x => x != SENTINEL))
.Select((c, i) => c.Select(x => (i, x))) // Embed the index of the subsequence
.Merge() // Merge them again
.Do(x => Console.WriteLine($"Received: {x}"))
.Subscribe();
await observable.LastOrDefaultAsync(); // Wait it to end
输出:
已收到:(0, 1)
已收到:(0, 2)
已收到:(0, 3)
已收到:(0, 4)
已收到:(1, 6)
已收到:(1, 7)
已收到:(1, 8)
已收到: (1, 9)
收到: (2, 11)
收到: (2, 12)
推荐阅读
- javascript - 一个月后在本机反应中删除异步存储
- tensorflow - 将 tensorflow 属性添加到 Config 类中
- windows-subsystem-for-linux - 在 WSL Ubuntu 上启用 SSH 服务器并从客户端连接到 SSH
- amazon-web-services - Amplify AppSync:自定义排序和分页过滤
- javascript - defer 不适用于 laravel 6 刀片中的 jquery.min.js
- python - 查找匹配和不匹配的 recs 熊猫
- sql - Oracle中如何根据列名选择列
- jasper-reports - 比较具有多个值的字符串
- azure - Azure 存储库:拉取请求中的无限合并冲突
- ruby-on-rails - 在 VS CODE 中使用 ruby-debug-ide 调试 Rspec