rx-swift - RxSwift 以类似于 combineLatest 的顺序订阅最新元素
问题描述
假设我有一些 Observable 在我订阅它时可能有一些任意长的事件序列,但在我订阅后它也可能继续发出事件。我只对订阅时及之后的那些事件感兴趣。如何获取最新活动?
在此示例中,我使用 ReplaySubject 作为人工来源来说明问题。在实践中,这将是一些任意的 Observable。
let observable = ReplaySubject<Int>.createUnbounded()
observable.onNext(1)
observable.onNext(2)
observable.onNext(3)
observable.onNext(4)
_ = observable.subscribe(onNext: {
print($0)
})
observable.onNext(5)
observable.onNext(6)
observable.onNext(7)
产生输出:1 2 3 4 5 6 7
我真正想要的只是从订阅开始的事件。即 4 5 6 7
我可以将 combineLatest 与其他一些虚拟 Observable 一起使用:
let observable = ReplaySubject<Int>.createUnbounded()
observable.onNext(1)
observable.onNext(2)
observable.onNext(3)
observable.onNext(4)
_ = Observable.combineLatest(observable, Observable<Int>.just(42)) { value, _ in value }
.subscribe(onNext: {
print($0)
})
observable.onNext(5)
observable.onNext(6)
observable.onNext(7)
产生所需的输出 4 5 6 7
如何在不人为引入另一个任意 Observable 的情况下产生类似的结果?
我已经尝试了很多东西,包括 combineLatest 和一个只包含一个 observable 的数组,但它会发出完整的序列,而不仅仅是最新的。我知道我可以使用 PublishSubject 但我只是在这里使用 ReplaySubject 作为说明。
解决方案
默认情况下,observable 将为每个订阅者调用其生成器,并发出该生成器生成的所有值。例如:
let obs = Observable.create { observer in
for each in [1, 2, 3, 5, 7, 11] {
observer.onNext(each)
}
observer.onCompleted()
}
(注意以上是 的实现Observable.from(_:)
)
每次订阅obs
闭包时,都会调用所有 6 个 next 事件。这就是所谓的“冷”可观察对象,也是默认行为。假设 Observable 是冷的,除非你知道。
还有一个“热”可观察的概念。当有东西订阅它时,热的 observable不会调用它的生成器函数。
根据您的问题和随后的评论,听起来您想知道如何使冷的可观察到的热...基本方法是调用.multicast
它(或使用其实现的运算符之一,例如publish()
,replay(_:)
或replayAll()
。)还有一个特殊用途的操作符.share()
,它会“加热”一个 observable 并使其保持热状态,直到所有订阅者都取消订阅它(然后它会再次变冷。)当然,主题被认为是热的,因为它们没有要调用的生成器函数。
但是请注意,许多 observables 具有同步行为,这意味着它们将在订阅后立即发出所有值,因此在任何其他观察者(在该线程上)有机会订阅之前已经完成。
更多示例....interval(_:scheduler:)
是具有异步行为的冷可观察对象。假设您有以下内容:
let i = Observable<Int>.interval(.seconds(3), scheduler: MainScheduler.instance)
i.subscribe(onNext: { print($0, "from first") })
DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
i.subscribe(onNext: { print($0, "from second") })
}
你会发现每个观察者都会得到它自己独立的值流(两者都以 0 开头),因为interval
两个观察者都调用了内部的生成器。所以你会看到如下输出:
0 from first
1 from first
0 from second
2 from first
1 from second
3 from first
2 from second
如果您多播间隔,您将看到不同的行为:
let i = Observable<Int>.interval(.seconds(3), scheduler: MainScheduler.instance)
.publish()
i.subscribe(onNext: { print($0, "from first") })
i.connect()
DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
i.subscribe(onNext: { print($0, "from second") })
}
以上将产生:
0 from first
1 from first
1 from second
2 from first
2 from second
3 from first
3 from second
(请注意,“second”以 1 而不是 0 开头。)在这种情况下,共享运算符的工作方式与此相同,但您不必调用connect()
,因为它会自动调用。
最后,小心。如果你发布一个同步的 observable,你可能不会得到你期望的结果:
let i = Observable.from([1, 2, 3, 5])
.publish()
i.subscribe(onNext: { print($0, "from first") })
i.connect()
i.subscribe(onNext: { print($0, "from second") })
产生:
1 from first
2 from first
3 from first
5 from first
connect()
因为在第二个观察者有机会订阅之前,所有 5 个事件(四个下一个事件和完成的事件)都会在调用时立即发出。
一篇可能对您有所帮助的文章是Hot and Cold Observables,但它非常先进......
推荐阅读
- javascript - 为什么 JavaScript 在使用 .remove() 时没有删除我的元素?
- javascript - 加载 Laravel 图表 consoletvs/charts:7.*
- java - 使用 HashSet 清除重复值。列表返回空值或返回重复值
- python - 为什么我的不和谐机器人会引发此错误?
- c# - JsonSerializer.Serialize 只序列化基类的属性
- c# - 子弹轨迹中的死区 - unity 2d
- ios - UITableViewCell 公共功能未执行
- c++ - 是 std::promise
无锁? - xml - 用于解压缩 xlsx 并从工作表 xml 文件中读取内容的 Powershell 脚本
- javascript - chrome扩展打开网站弹出窗口后停止工作如何解决?