首页 > 解决方案 > onEach 更改 StateFlow 中的调度程序(kotlin 协程)

问题描述

想象一下以下自包含的测试用例

@Test
fun `stateFlow in GlobalScope`() = runBlockingTest {

    suspend fun makeHeavyRequest(): String {
        return "heavy result"
    }

    val flow1 = flowOf(Unit)
        .map { makeHeavyRequest() }
        .onEach { logThread("1: before flowOn") }
        .flowOn(testDispatcher)
        .stateIn(GlobalScope, SharingStarted.Lazily, "init state")

    val flow2 = flowOf(Unit)
        .map { makeHeavyRequest() }
        .onEach { logThread("2: before flowOn") }
        .flowOn(testDispatcher)
        .stateIn(GlobalScope, SharingStarted.Lazily, "init state")
        .onEach { logThread("2: after stateIn") }

    val flow3 = flowOf(Unit)
        .map { makeHeavyRequest() }
        .onEach { logThread("3: before flowOn") }
        .flowOn(testDispatcher)
        .onEach { logThread("3: after flowOn") }
        .stateIn(GlobalScope, SharingStarted.Lazily, "init state")

    flow1.test {
        assertEquals("heavy result", expectItem())
        cancelAndIgnoreRemainingEvents()
    }

    flow2.test {
        assertEquals("heavy result", expectItem())
        cancelAndIgnoreRemainingEvents()
    }

    flow3.test {
        assertEquals("heavy result", expectItem())
        cancelAndIgnoreRemainingEvents()
    }

}

运行它的效果将是:

Thread (1: before flowOn): Thread[main @coroutine#2,5,main]
Thread (2: before flowOn): Thread[main @coroutine#3,5,main]
Thread (2: after stateIn): Thread[main @coroutine#6,5,main]
Thread (3: before flowOn): Thread[DefaultDispatcher-worker-1 @coroutine#8,5,main]
Thread (3: after flowOn): Thread[DefaultDispatcher-worker-1 @coroutine#4,5,main]


org.opentest4j.AssertionFailedError: 
Expected :heavy result
Actual   :init state

flow3放置onEach之间flowOnstateIn完全改变调度程序并弄乱结果。这是为什么?

标签: kotlinkotlin-coroutines

解决方案


发生这种情况的原因是,stateIn运算符根据上游流是否为 a 进行了一些优化ChannelFlow
.flowOn(...)返回一段ChannelFlow时间.onEach(...)没有。

通常这并不重要。为什么它在您的情况下很重要,是因为您希望返回的流stateIn永远不会发出初始值。但是这个参数是强制性的,你应该期望收到初始值是有原因的。您是否真的这样做主要取决于上游流是否能够在不暂停的情况下发出值。

现在看来,stateIn运营商的优化之一是,它可能会在不暂停的情况下消耗 ChannelFlow。这就是为什么您在使用时会得到预期的行为

.flowOn(testDispatcher) /*returns ChannelFlow*/
.stateIn(GlobalScope, SharingStarted.Lazily, "init state")

推荐阅读