首页 > 解决方案 > 与 kotlins 流异步发出流值

问题描述

我正在使用 kotlin 和 webflux 构建一个简单的 Spring Service。

我有一个返回流的端点。该流包含需要很长时间才能计算出来的元素,这些元素由 a 模拟delay

它是这样构造的:

suspend fun latest(): Flow<Message> {
    println("generating messages")


    return flow {
        for (i in 0..20) {
            println("generating $i")
            if (i % 2 == 0) delay(1000)
            else delay(200)
            println("generated messsage $i")
            emit(generateMessage(i))
        }
        println("messages generated")
    }
}

我的期望是它会返回 Message1,然后是 Message3,Message5...,然后是 Message0,因为各个代的延迟不同。

但实际上,流程包含按顺序排列的元素。

我想我错过了一些关于协程和流程的重要信息,我尝试了不同的想法来实现我想要的协程,但我不知道如何。

解决方案

channelFlow正如 Marko Topolnik 和 William Reed 使用作品所指出的那样。

fun latest(): Flow<Message> {
    println("generating numbers")

    return channelFlow {
        for (i in 0..20) {
            launch {
                send(generateMessage(i))
            }
        }
    }
}

suspend fun generateMessage(i: Int): Message {
    println("generating $i")
    val time = measureTimeMillis {
        if (i % 2 == 0) delay(1000)
        else delay(500)
    }

    println("generated messsage $i in ${time}ms")
    return Message(UUID.randomUUID(), "This is Message $i")
}

运行时结果符合预期

generating numbers
generating 2
generating 0
generating 1
generating 6
...
generated messsage 5 in 501ms
generated messsage 9 in 501ms
generated messsage 13 in 501ms
generated messsage 15 in 505ms
generated messsage 4 in 1004ms
...

标签: kotlinkotlin-coroutinesflow

解决方案


一旦你同时计算每个元素,你的第一个问题将是弄清楚所有计算何时完成。

您必须提前知道预期有多少项目。所以对我来说构建一个平原List<Deferred<Message>>然后在返回整个东西之前等待所有延迟似乎很自然。在您的情况下,您没有从流程中获得任何里程,因为流程就是在流程集合内部同步地做事。

您还可以channelFlow与预期的已知消息数结合使用,然后基于此终止流。优点是 Spring 可以更早地开始收集流。


编辑

实际上,计数的问题并不存在:流程会自动等待您启动的所有子协程完成。


推荐阅读