首页 > 解决方案 > 在最后插入 observable 之后,switchOnNext 运算符不会为订阅发出

问题描述

首先,一些背景(也许有更好的方法):

我们有一个模块可以在特定的 Observable 上发出传入的蓝牙消息。然后我们处理这些消息,最后在最后订阅以发送消息。这种处理可能会在某个时候发生变化,这对于大多数处理意味着重新创建中间 Observables,以及依赖它的所有 observables(因为它们现在将处理无效数据)。

我们想改变它,以便重新创建处理的某些部分不需要重新创建依赖它的所有内容,主要是因为我们不必一直记住什么依赖于什么,并且还让具有内部状态的操作符(如缓冲、扫描或去抖动)不会丢失这个内部状态。

有希望的解决方案:

通过使用 switchOnNext 操作符,我们可以解决这个问题。每当重新创建中间 observable 时,我们只需将其添加到 switchOnNext 的原点,订阅 switchOnNext 输出的任何人都会立即获得新结果。

问题:

如果 switchOnNext 之后的处理必须改变,它将停止获取结果,直到之前的 observable 改变。这意味着我们现在遇到了相反的问题。每当某些部分发生变化时,我们必须递归地重新创建它所依赖的所有内容。这稍微好一点(跟踪依赖的东西比跟踪依赖它的所有东西要容易得多),但是 observables 仍然会丢失内部状态,因为它们必须重新创建。

这种行为似乎违反了文档所说的应该发生的事情,但它并没有明确说明一种或另一种方式。

示例代码:

此代码演示了该问题。

import io.reactivex.Observable
import io.reactivex.subjects.PublishSubject

fun main() {
    //Observable of observables
    val publishSubject: PublishSubject<Observable<Int>> = PublishSubject.create()
    //Observable to subscribe to get the most recent values
    val observable: Observable<Int> = Observable.switchOnNext(publishSubject)

    observable.subscribe { println("1: $it") }
    //Now 1 is subscribed

    val obsAux1 = PublishSubject.create<Int>()

    observable.subscribe { println("2: $it") }
    //Now 1 and 2 are subscribed

    publishSubject.onNext(obsAux1)

    observable.subscribe { println("3: $it") }
    //Now 1, 2 and 3 are subscribed

    //Should print out from subscriptions 1, 2 and 3, but only 1 and 2 printed
    obsAux1.onNext(1)

    val obsAux2 = PublishSubject.create<Int>()

    publishSubject.onNext(obsAux2)

    observable.subscribe { println("4: $it") }
    //Now 1, 2, 3 and 4 are subscribed

    //Should not print anything
    obsAux1.onNext(2)
    //Should print out from subscriptions 1, 2, 3 and 4, but only 1, 2 and 3 printed
    obsAux2.onNext(3)
} 

此代码的输出:

1: 1
2: 1
1: 3
2: 3
3: 3

预期输出:

1: 1
2: 1
3: 1 <--- This is missing
1: 3
2: 3
3: 3
4: 3 <--- This is missing

第一次 obsAux1 发出时,所有三个订阅都应该打印出来,但只有在它被添加到 publishSubject 之前的那些会打印出来。

第二次 obsAux1 发出时,不应打印任何内容,因为 obsAux2 已经插入。这按预期工作

obsAux2 第一次发出时,所有四个订阅都应该打印。第三个订阅按预期打印,这应该订阅工作正常。但是第四个订阅没有打印任何内容,因为它是在将 obsAux2 插入到 publishSubject 之后添加的。

标签: kotlinrx-kotlin2

解决方案


您应该像这样使用 switchOnNext。在他们提到的文档中

{@code switchOnNext} 订阅一个发出 ObservableSources 的 ObservableSource。每次它观察到其中一个发出的 ObservableSource 时,由 {@code switchOnNext} 返回的 ObservableSource 开始发出由该 ObservableSource 发出的项目。当一个新的 ObservableSource 发出时,{@code switchOnNext} 停止从之前发出的 ObservableSource 发出项目,并开始从新的 ObservableSource 发出项目。

以下代码将给出您预期的输出。根据逻辑,您将按此顺序更改它..

fun main()
{
val publishSubject: PublishSubject<Observable<Int>> = PublishSubject.create()
val observable: Observable<Int> = Observable.switchOnNext(publishSubject)
observable.subscribe { println("1: $it") }

val obsAux1 = PublishSubject.create<Int>()
observable.subscribe { println("2: $it") }
observable.subscribe { println("3: $it") }
publishSubject.onNext(obsAux1)
obsAux1.onNext(1)

val obsAux2 = PublishSubject.create<Int>()
publishSubject.onNext(obsAux2)
observable.subscribe { println("4: $it") }
obsAux1.onNext(2)

publishSubject.onNext(obsAux2)
obsAux2.onNext(3) 
}

推荐阅读