首页 > 解决方案 > Kotlin Flow:当 Fragment 变得不可见时取消订阅 SharedFlow

问题描述

我读过类似的主题,但找不到正确的答案:

在我的Repository课堂上,我感冒Flow了,我想分享给 2 Presenters/ViewModels所以我的选择是使用shareIn运算符。

让我们看一下 Android 文档的示例:

val latestNews: Flow<List<ArticleHeadline>> = flow {
    ...
}.shareIn(
    externalScope,  // e.g. CoroutineScope(Dispatchers.IO)?
    replay = 1,
    started = SharingStarted.WhileSubscribed()
)

什么文档建议externalScope参数:

用于共享流的 CoroutineScope。这个范围应该比任何消费者都活得更长,以使共享流在需要时保持活动状态。

但是,在寻找关于如何停止订阅 aFlow的答案时,第二个链接中投票最多的答案是:

一个解决方案不是取消流程,而是取消它启动的范围。

对我来说,这些答案在SharedFlow's 的情况下是矛盾的。不幸的是,即使在调用它之后,我的Presenter/仍然会收到最新数据。ViewModelonCleared

如何防止这种情况?这是我如何在 / 中使用它FlowPresenter示例ViewModel

fun doSomethingUseful(): Flow<OtherModel> {
    return repository.latestNews.map(OtherModel)

如果这可能有帮助,我正在使用 MVI 架构,以便doSomethingUseful对用户创建的一些意图作出反应。

标签: androidkotlinkotlin-flowkotlin-coroutines

解决方案


我试图提供一个带有相关评论的最小示例。如前所述,SharedFlow 的工作方式与ConnectableObservableRxJava 中的 a 非常相似。上游只会被订阅一次,这意味着计算只对冷的上游流进行一次。您的存储库什么都不做,因为它是一个冷流,在SharedFlow订阅之前永远不会“收集”,因此它没有范围。

使用过 RxJava 和 Flow 有很多相似之处。Flow如果从基础 Reactive Streams 接口扩展而来,创建和接口似乎几乎没有必要,Collector并且可以使开发人员更容易过渡 - 但我不知道根本原因 - 也许他们希望使用新的 api 获得更大的灵活性,或者从 Java 9 实现和 RxJava 等另一个 Reactive Streams 实现中脱颖而出。

class MyViewModel : ViewModel(), CoroutineScope {

    override val coroutineContext: CoroutineContext = Dispatchers.Main.immediate + SupervisorJob() // optional + CoroutineExceptionHandler()

    private val latestNews: Flow<List<String>> = doSomethingUseful()
            .flowOn(Dispatchers.IO) // upstream will operate on this dispatch
            .shareIn(scope = this, // shared in this scope - becomes hot flow  (or use viewModelScope) for lifetime of your view model - will only connect to doSomethingUseful once for lifetime of scope
                     replay = 1,
                     started = SharingStarted.WhileSubscribed())


    fun connect() : Flow<List<String>> = latestNews // expose SharedFlow to "n" number of subscribers or same subscriber more than once

    override fun onCleared() {
        super.onCleared()
        cancel() // cancel the shared flow - this scope is finished
    }
}

class MainActivity : AppCompatActivity(), CoroutineScope {

    override val coroutineContext: CoroutineContext = Dispatchers.Main.immediate + SupervisorJob()

    private var job : Job? = null

    // supply the same view model instance on config changes for example - its scope is larger
    private val vm : MyViewModel by viewModels()

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
    }

    override fun onStart() {
        super.onStart()

        job = launch {
            vm.connect().collect {
                // observe latest emission of hot flow and subsequent emissions if any - either reconnect or connect for first time
            }
        }
    }

    override fun onStop() {
        super.onStop()

        // cancel the job but latest news is still "alive" and receives emissions as it is running in a larger scope of this scope
        job?.cancel()
    }

    override fun onDestroy() {
        super.onDestroy()
        // completely cancel this scope - the ViewModel scope is unaffected
        cancel()
    }
}

推荐阅读