首页 > 解决方案 > ZIP 运算符不适用于 PublishSubject,我做错了什么?

问题描述

我是 RxJava 的新手,无法意识到 - 为什么当我使用两个 PublishSubject 时,我的“压缩”可观察对象不会发出项目?(据我所知 ZIP 运营商应该将两个流“合并”为一个)

val currentSubject = PublishSubject.create<Int>()
val maxSubject = PublishSubject.create<Int>()

currentSubject.onNext(1)
maxSubject.onNext(2)

currentSubject.onNext(1)
maxSubject.onNext(2)

Log.d("custom", "BINGO!")

val zipped = Observables.zip(currentSubject, maxSubject) { current, max -> "current : $current, max : $max " }
zipped.subscribe(
    { Log.d("custom", it) },
    { Log.d("custom", "BONGO!") },
    { Log.d("custom", "KONGO!") }
)

currentSubject.onComplete()
maxSubject.onComplete()

我期待这些项目出现在“{ Log.d("custom", it) }" 函数中,但它没有发生。我做错了什么?

编译后的日志:

2019-06-25 22:25:36.802 3631-3631/ru.grigoryev.rxjavatestdeleteafter D/custom: BINGO!

2019-06-25 22:25:36.873 3631-3631/ru.grigoryev.rxjavatestdeleteafter D/custom: KONGO!

标签: androidrx-java2

解决方案


这里的问题不在于您的zip实现,而在于 a 的默认行为PublishSubject。但首先,让我们备份

冷热观测

在 Rx 中,有和两种类型。最常见的类型是可观察的。一个可观察对象在被调用之前不会开始发射值。Obervableshotcoldcoldcold.subscribe()

val obs = Observable.fromIterable(listOf(1, 2, 3, 4);
obs.subscribe { print(it) }
// Prints 1, 2, 3, 4

hot无论观察者是否订阅了 observable,它都会发出值。

val subject = PublishSubject.create<Int>()
subject.onNext(1)
subject.onNext(2)
subject.subscribe { print(it) }
subject.onNext(3)
subject.onNext(4)

// Prints 3, 4

注意未打印的方式1和位置。2这是因为 aPublishSubject是一个hot可观察的1并且2在它被订阅之前发出。

回到你的问题

在您的示例中,您的发布主题在订阅之前发出 1 和 2。zipped一起查看它们,请移动您的代码。

val currentSubject = PublishSubject.create<Int>()
val maxSubject = PublishSubject.create<Int>()

Log.d("custom", "BINGO!")

val zipped = Observables.zip(currentSubject, maxSubject) { current, max -> "current : $current, max : $max " }
zipped.subscribe(
    { Log.d("custom", it) },
    { Log.d("custom", "BONGO!") },
    { Log.d("custom", "KONGO!") }
)

currentSubject.onNext(1)
maxSubject.onNext(2)

currentSubject.onNext(1)
maxSubject.onNext(2)


currentSubject.onComplete()
maxSubject.onComplete()

推荐阅读