首页 > 解决方案 > RxSwift 以类似于 combineLatest 的顺序订阅最新元素

问题描述

假设我有一些 Observable 在我订阅它时可能有一些任意长的事件序列,但在我订阅后它也可能继续发出事件。我只对订阅时及之后的那些事件感兴趣。如何获取最新活动?

在此示例中,我使用 ReplaySubject 作为人工来源来说明问题。在实践中,这将是一些任意的 Observable。

let observable = ReplaySubject<Int>.createUnbounded()

observable.onNext(1)
observable.onNext(2)
observable.onNext(3)
observable.onNext(4)

_ = observable.subscribe(onNext: {
    print($0)
})
observable.onNext(5)
observable.onNext(6)
observable.onNext(7)

产生输出:1 2 3 4 5 6 7

我真正想要的只是从订阅开始的事件。即 4 5 6 7

我可以将 combineLatest 与其他一些虚拟 Observable 一起使用:

let observable = ReplaySubject<Int>.createUnbounded()

observable.onNext(1)
observable.onNext(2)
observable.onNext(3)
observable.onNext(4)

_ = Observable.combineLatest(observable, Observable<Int>.just(42)) { value, _ in value }
    .subscribe(onNext: {
    print($0)
})
observable.onNext(5)
observable.onNext(6)
observable.onNext(7)

产生所需的输出 4 5 6 7

如何在不人为引入另一个任意 Observable 的情况下产生类似的结果?

我已经尝试了很多东西,包括 combineLatest 和一个只包含一个 observable 的数组,但它会发出完整的序列,而不仅仅是最新的。我知道我可以使用 PublishSubject 但我只是在这里使用 ReplaySubject 作为说明。

标签: rx-swift

解决方案


默认情况下,observable 将为每个订阅者调用其生成器,并发出该生成器生成的所有值。例如:

let obs = Observable.create { observer in 
    for each in [1, 2, 3, 5, 7, 11] { 
        observer.onNext(each)
    }
    observer.onCompleted()
}

(注意以上是 的实现Observable.from(_:)

每次订阅obs闭包时,都会调用所有 6 个 next 事件。这就是所谓的“冷”可观察对象,也是默认行为。假设 Observable 是冷的,除非你知道。

还有一个“热”可观察的概念。当有东西订阅它时,热的 observable不会调用它的生成器函数。

根据您的问题和随后的评论,听起来您想知道如何使冷的可观察到的热...基本方法是调用.multicast它(或使用其实现的运算符之一,例如publish()replay(_:)replayAll()。)还有一个特殊用途的操作符.share(),它会“加热”一个 observable 并使其保持热状态,直到所有订阅者都取消订阅它(然后它会再次变冷。)当然,主题被认为是热的,因为它们没有要调用的生成器函数。

但是请注意,许多 observables 具有同步行为,这意味着它们将在订阅后立即发出所有值,因此在任何其他观察者(在该线程上)有机会订阅之前已经完成。

更多示例....interval(_:scheduler:)是具有异步行为的冷可观察对象。假设您有以下内容:

let i = Observable<Int>.interval(.seconds(3), scheduler: MainScheduler.instance)
i.subscribe(onNext: { print($0, "from first") })
DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
    i.subscribe(onNext: { print($0, "from second") })
}

你会发现每个观察者都会得到它自己独立的值流(两者都以 0 开头),因为interval两个观察者都调用了内部的生成器。所以你会看到如下输出:

0 from first
1 from first
0 from second
2 from first
1 from second
3 from first
2 from second

如果您多播间隔,您将看到不同的行为:

let i = Observable<Int>.interval(.seconds(3), scheduler: MainScheduler.instance)
    .publish()
i.subscribe(onNext: { print($0, "from first") })
i.connect()
DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
    i.subscribe(onNext: { print($0, "from second") })
}

以上将产生:

0 from first
1 from first
1 from second
2 from first
2 from second
3 from first
3 from second

(请注意,“second”以 1 而不是 0 开头。)在这种情况下,共享运算符的工作方式与此相同,但您不必调用connect(),因为它会自动调用。

最后,小心。如果你发布一个同步的 observable,你可能不会得到你期望的结果:

let i = Observable.from([1, 2, 3, 5])
    .publish()
i.subscribe(onNext: { print($0, "from first") })
i.connect()
i.subscribe(onNext: { print($0, "from second") })

产生:

1 from first
2 from first
3 from first
5 from first

connect()因为在第二个观察者有机会订阅之前,所有 5 个事件(四个下一个事件和完成的事件)都会在调用时立即发出。

一篇可能对您有所帮助的文章是Hot and Cold Observables,但它非常先进......


推荐阅读