首页 > 解决方案 > 用于单个事件或超时事件的 RxJava 运算符

问题描述

我有一个场景,比如我期待在特定时间内发生事件。如果在特定时间内没有发生任何事件,那么我也需要在那之后采取行动。

例如,我期待一个有效的事件3 sec,如果没有事件发生,3 sec我需要采取另一个行动。

为此,我使用过timeout,但在获得有效的事件超时后也会触发TimeoutException.

    ResultStatusRepo.resultStatus
        .subscribeOn(Schedulers.io())
        .timeout(3000, TimeUnit.MILLISECONDS)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe({
            Timber.i("Result valid status: $it" )
        }, {
            Timber.e("Error: ${it.message}")
            if (it is TimeoutException) {
                Timber.i("Timeout for result")
            }
        })

resultStatus是类型Observable<Boolean>

如果我在 1 秒后得到结果而不是在 3 秒后再次获得结果,我就会TimeoutException进入onError。我知道timeout行为是这样的。对于我的场景,哪个运营商最适合。有没有其他办法。

提前致谢。

编辑:

结果状态回购

private val ownResultStatus = BehaviorSubject.create<Boolean>()
val resultStatus : Observable<Boolean> get() = ownResultStatus

标签: javakotlinreactive-programmingrx-java2

解决方案


下面的解决方案呢?它会帮助你吗?

import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.functions.Function
import io.reactivex.rxjava3.schedulers.TestScheduler
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException

class TimeOutOverload {
    lateinit var sched: TestScheduler
    lateinit var merge: Observable<String>

    @BeforeEach
    fun setup() {
        sched = TestScheduler()
        merge = Observable.merge(
            Observable.timer(6, TimeUnit.SECONDS, sched),
            Observable.timer(20, TimeUnit.SECONDS, sched)
        ).map { "42_${it}" }
    }

    @Test
    fun `65671148 - times out on first`() {
        val test =
            merge.timeout(
                Observable.timer(5L, TimeUnit.SECONDS, sched),
                Function<String, Observable<String>> { _ -> Observable.never<String>() }
            ).test()

        sched.advanceTimeBy(60, TimeUnit.SECONDS)
        sched.triggerActions()

        test.assertError(TimeoutException::class.java)
    }

    @Test
    fun `65671148 - no timeout after first value arrived`() {
        val test =
            merge.timeout(
                Observable.timer(7L, TimeUnit.SECONDS, sched),
                Function<String, Observable<String>> { _ -> Observable.never<String>() }
            ).test()

        sched.advanceTimeBy(60, TimeUnit.SECONDS)
        sched.triggerActions()

        test.assertValues("42_0", "42_0")
    }
}

timeout操作员只会在第一个元素上触发。所有其他元素都将通过。

关于:

如果在 3 秒内没有发生任何事件,我需要采取另一项行动。

发生这种情况时,您必须使用onError*-Operator 来提供后备值TimeoutException

注意:我使用了与您的示例不同的值。


推荐阅读