kotlin-coroutines - 如何在 Kotlin Coroutines 中创建 MutableSharedFlow,类似于 RxJava 的 PublishSubject?
问题描述
PublishSubject
Kotlin Coroutines 库中是否有来自 RxJava 的等价物?
通道不能替代,PublishSubject
因为它们不会将值发布到多个收集器(每个值只能由单个收集器收集)。即使MutableSharedFlow
支持多个收集器,仍然不允许在不等待收集器完成处理先前值的情况下发出值。我们如何创建一个功能类似于的流程PublishSubject
?
解决方案
以下代码将创建一个Flow
等价于PublishSubject
:
fun <T> publishFlow(): MutableSharedFlow<T> {
return MutableSharedFlow(
replay = 0,
extraBufferCapacity = Int.MAX_VALUE
)
}
PublishSubject 的主要属性是它不会将旧值重播给新观察者,并且仍然允许发布新值/事件而无需等待观察者处理它们。因此,可以MutableSharedFlow
通过指定replay = 0
防止新收集器收集旧值并extraBufferCapacity = Int.MAX_VALUE
允许发布新值而不等待繁忙的收集器完成收集以前的值来实现此功能。
可以添加以下forceEmit
要调用的函数而不是tryEmit
,以确保实际发出该值:
fun <T> MutableSharedFlow<T>.forceEmit(value: T) {
val emitted = tryEmit(value)
check(emitted){ "Failed to emit into shared flow." }
}
由于我们有一个带MAX_VALUE
容量的缓冲区,因此forceEmit
如果我们将它与我们的publishFlow
. 如果流将被以某种方式替换为不支持在不挂起的情况下发出的不同流,我们将得到一个异常并且将知道如何处理缓冲区已满并且无法在没有挂起的情况下发出的情况。
请注意,MAX_VALUE
如果收集器收集值需要很长时间,则拥有容量缓冲区可能会导致大量内存消耗,因此它更适合收集器执行短同步操作的情况(类似于 RxJava 观察者)。
推荐阅读
- react-native - 如何模拟作为更新状态的回调的道具?
- javascript - 如何在按钮上方悬停几秒钟后出现类似工具提示的元素
- android - 从另一个类停止 CardView 动画
- css - CSS Font Face 声明
- excel - 选择导入文件的列并将它们以非订单形式复制并粘贴到另一张纸上
- ios - React 本机 UDP 不会从 iOS 发送任何消息,有人让它工作吗?
- reactjs - 在组件方法上使用反应上下文
- c - 通用原子通道,也可以与自定义结构一起使用
- css - 更改 Bootstrap 列表组活动项的样式
- kubernetes - 如何查看 Pod 日志:必须为 pod 指定容器名称... 选择以下之一:[wait main]