首页 > 解决方案 > 如何在 Kotlin Coroutines 中创建 MutableSharedFlow,类似于 RxJava 的 PublishSubject?

问题描述

PublishSubjectKotlin Coroutines 库中是否有来自 RxJava 的等价物?

通道不能替代,PublishSubject因为它们不会将值发布到多个收集器(每个值只能由单个收集器收集)。即使MutableSharedFlow支持多个收集器,仍然不允许在不等待收集器完成处理先前值的情况下发出值。我们如何创建一个功能类似于的流程PublishSubject

标签: kotlin-coroutines

解决方案


以下代码将创建一个Flow等价于PublishSubject

fun <T> publishFlow(): MutableSharedFlow<T> {
    return MutableSharedFlow(
        replay = 0,
        extraBufferCapacity = Int.MAX_VALUE
    )
}

PublishSubject 的主要属性是它不会将旧值重播给新观察者,并且仍然允许发布新值/事件而无需等待观察者处理它们。因此,可以MutableSharedFlow通过指定replay = 0 防止新收集器收集旧值并extraBufferCapacity = Int.MAX_VALUE允许发布新值而不等待繁忙的收集器完成收集以前的值来实现此功能。

可以添加以下forceEmit要调用的函数而不是tryEmit,以确保实际发出该值:

fun <T> MutableSharedFlow<T>.forceEmit(value: T) {
    val emitted = tryEmit(value)
    check(emitted){ "Failed to emit into shared flow." }
}

由于我们有一个带MAX_VALUE容量的缓冲区,因此forceEmit如果我们将它与我们的publishFlow. 如果流将被以某种方式替换为不支持在不挂起的情况下发出的不同流,我们将得到一个异常并且将知道如何处理缓冲区已满并且无法在没有挂起的情况下发出的情况。

请注意,MAX_VALUE如果收集器收集值需要很长时间,则拥有容量缓冲区可能会导致大量内存消耗,因此它更适合收集器执行短同步操作的情况(类似于 RxJava 观察者)。


推荐阅读