首页 > 解决方案 > 流 - 暂停/恢复流

问题描述

在 RxJava 中,有一个valve操作符允许暂停(和缓冲)流并再次恢复流(并且在恢复后立即发出缓冲的值)。它是 rx java 扩展的一部分(https://github.com/akarnokd/RxJavaExtensions/blob/3.x/src/main/java/hu/akarnokd/rxjava3/operators/FlowableValve.java)。

kotlin 流有这样的东西吗?

我的用例是我想观察活动中的流程并且永远不会丢失事件(就像我会这样做,LiveData例如,如果活动暂停,它将停止观察数据)。因此,当活动暂停时,我希望流程缓冲观察到的值,直到活动恢复,并在活动恢复后立即将它们全部发出。

因此,当活动被创建(直到它被销毁)时,我想观察流程但我只想在活动处于活动状态时发出值,并在活动不活动时缓冲值(但仍被创建),直到它再次被激活。

有什么东西可以解决这个问题,或者有没有人写过东西来解决这个问题?

标签: kotlinflowkotlin-flow

解决方案


我知道这是丑陋的解决方案,但对我来说效果很好:

fun main() {
    val flow = MutableSharedFlow<String>(extraBufferCapacity = 50, onBufferOverflow = BufferOverflow.DROP_OLDEST)
    val isOpened = AtomicBoolean()

    val startTime = System.currentTimeMillis()
    GlobalScope.launch(Executors.newSingleThreadExecutor().asCoroutineDispatcher()) {
        flow
            .transform { value ->
                while (isOpened.get().not()) { }
                emit(value)
            }
            .collect {
                println("${System.currentTimeMillis() - startTime}: $it")
            }
    }

    Thread.sleep(1000)
    flow.tryEmit("First")
    Thread.sleep(1000)
    isOpened.set(true)
    flow.tryEmit("Second")
    isOpened.set(false)
    Thread.sleep(1000)
    isOpened.set(true)
    flow.tryEmit("Third")
    Thread.sleep(2000)
}

结果: 代码执行结果

因此,您可以在活动生命周期暂停时将 isOpened 设置为 false,在恢复时将其设置为 true。


推荐阅读