首页 > 解决方案 > RxJava2 如果元素当前存在于流中,则忽略它

问题描述

如果元素当前出现在反应流中,我需要删除/忽略元素。例如:

fun ignoreDuplicatesExample() {

    val publishSubject: PublishSubject<Long> = PublishSubject.create()
    publishSubject.observeOn(Schedulers.single()).distinct().subscribe({
        Thread.sleep(1000)
        println("onNext: $it")
    }, {
        error("$it")
    })

    publishSubject.onNext(1)
    publishSubject.onNext(2)
    publishSubject.onNext(3)

    publishSubject.onNext(1) // should be ignored
    publishSubject.onNext(2) // should be ignored
    publishSubject.onNext(3) // should be ignored

    Thread.sleep(10_000)
    publishSubject.onNext(1) // by this time it should be already consumed, so it need to be allowed to emit it again
    publishSubject.onNext(4)

    Thread.sleep(10_000)
    println("exit")
}

输出:

onNext: 1
onNext: 2
onNext: 3
onNext: 4
exit

但我希望看到:

onNext: 1
onNext: 2
onNext: 3
onNext: 1
onNext: 4
exit

那么,有人知道如何使用 RxJava2 来实现它吗?

标签: kotlinrx-java2

解决方案


你不能直接做到这一点,因为上游链不应该知道下游,如果它是“消费”或不。(想象一下,如果有多个订阅者。)如果你想这样做,你需要与流之外的变量进行交互。


推荐阅读