kotlin - 与 kotlins 流异步发出流值
问题描述
我正在使用 kotlin 和 webflux 构建一个简单的 Spring Service。
我有一个返回流的端点。该流包含需要很长时间才能计算出来的元素,这些元素由 a 模拟delay
。
它是这样构造的:
suspend fun latest(): Flow<Message> {
println("generating messages")
return flow {
for (i in 0..20) {
println("generating $i")
if (i % 2 == 0) delay(1000)
else delay(200)
println("generated messsage $i")
emit(generateMessage(i))
}
println("messages generated")
}
}
我的期望是它会返回 Message1,然后是 Message3,Message5...,然后是 Message0,因为各个代的延迟不同。
但实际上,流程包含按顺序排列的元素。
我想我错过了一些关于协程和流程的重要信息,我尝试了不同的想法来实现我想要的协程,但我不知道如何。
解决方案
channelFlow
正如 Marko Topolnik 和 William Reed 使用作品所指出的那样。
fun latest(): Flow<Message> {
println("generating numbers")
return channelFlow {
for (i in 0..20) {
launch {
send(generateMessage(i))
}
}
}
}
suspend fun generateMessage(i: Int): Message {
println("generating $i")
val time = measureTimeMillis {
if (i % 2 == 0) delay(1000)
else delay(500)
}
println("generated messsage $i in ${time}ms")
return Message(UUID.randomUUID(), "This is Message $i")
}
运行时结果符合预期
generating numbers
generating 2
generating 0
generating 1
generating 6
...
generated messsage 5 in 501ms
generated messsage 9 in 501ms
generated messsage 13 in 501ms
generated messsage 15 in 505ms
generated messsage 4 in 1004ms
...
解决方案
一旦你同时计算每个元素,你的第一个问题将是弄清楚所有计算何时完成。
您必须提前知道预期有多少项目。所以对我来说构建一个平原List<Deferred<Message>>
然后在返回整个东西之前等待所有延迟似乎很自然。在您的情况下,您没有从流程中获得任何里程,因为流程就是在流程集合内部同步地做事。
您还可以channelFlow
与预期的已知消息数结合使用,然后基于此终止流。优点是 Spring 可以更早地开始收集流。
编辑
实际上,计数的问题并不存在:流程会自动等待您启动的所有子协程完成。
推荐阅读
- jenkins - 带有 pipeline-github 插件的 Jenkins 多分支管道:首次运行的限制
- sql-server - Outlook、VBA 宏、到 SQL Server Express 的连接字符串
- subquery - 即使我将限制设置为 5,如何在列中显示重复的结果?
- c - 在 C 中集成复合函数
- javascript - 如何从另一个访问堆栈导航器的导航道具
- javascript - 此代码是否用于确定圆和线段是否相交正确?
- typescript - 作为道具传递与提取缓存 Apollo 客户端 Nextjs
- node.js - Typeorm 交易不发布
- rust - 了解克隆借用
- html - Bootstrap 4轮播控制按钮不起作用