kotlin - 如何跳过流并仅发送最后发出的值
问题描述
我正在开发一个 Web 应用程序,以可视化基于 Websockets 的迷宫生成和求解算法。生成算法实现接口 IMazeGenerator 并返回一个 Flow。
interface IMazeGenerator {
fun generate(maze: Maze): Flow<Maze>
}
在生成器算法的每一步之后,都会发出更新的迷宫。然后,收集器会在短暂延迟后将中间步骤发送给客户端。
generatorFlow
.map { it.toDto() }
.delay(150)
.onEach {
client.send(UpdateMaze(it))
}
.launchIn(this.scope)
现在我的问题是:我想为客户提供跳过中间步骤的可能性,然后只发送最后一个结果,即最终的迷宫。为此,我首先需要能够确定发射器是否已经完成(即最终迷宫已经发射),如果是,则在跳过命令到来时仅发送最后一个结果。
不幸的是,我现在根本不知道,我什至不确定这是否适用于流程。欢迎任何帮助。
解决方案
谢谢marstran,不幸的是这还没有直接起作用,但你给了我必要的灵感。让我先解释问题,然后是我的解决方案。
一旦触发生成,流程就会启动。如果客户端想在某个点之后跳过所有进一步的中间步骤,我需要从这个流中传输最后一个发射。使用您的解决方案,问题是在这种情况下我会重新启动流程,并获得不同的结果,因为某些算法是不确定的。另一个问题是我首先必须确定发射器是否已经计算了最后一个结果,因为某些算法确实需要一些时间。此时,流程将是可跳过的。因此,我还必须使用缓冲区来忽略背压。
我现在创建了一个扩展函数,它接收最后一个发出值的回调函数。
fun <T> Flow<T>.finalValueCallback(block: suspend (T?) -> Unit) = flow {
var finalValue: T? = null
collect {
emit(it)
finalValue = it
}
block(finalValue)
}
然后我使用这个回调来保存当前流的最后一个结果。如果出现跳过命令,我会中断流程并仅发送最后一个结果。
generatorJob = generatorFlow
.onStart { client.send(UpdateGeneratorState(GeneratorState.RUNNING)) }
.map { it.toDto() }
.finalValueCallback {
finalMaze = it
client.send(UpdateGeneratorState(GeneratorState.SKIPPABLE))
}
.buffer()
.delay(150)
.onEach {
client.send(UpdateMaze(it))
}
.onCompletion { cause ->
finalValue = null
if (cause == null || cause is FlowSkippedException) {
client.send(UpdateGeneratorState(GeneratorState.COMPLETED))
} else {
client.send(UpdateGeneratorState(GeneratorState.CANCELLED))
}
}
.launchIn(this.scope)
推荐阅读
- java - 通过 FXML 管理 JavaFX 中的场景切换(性能问题)
- visual-studio - Visual Studio 2017 F# 中的命名空间问题
- c# - 如何从注册表中确定 Windows Server 2016 版本
- javascript - 离子必须在按钮操作后保存状态
- shell - 使用 POST 在 curl 命令中传递带空格的值
- redis - Kubernetes Redis 集群 PubSub 通道未在副本上同步
- python - 如何在 Python 中重命名 PDF 文件中的连线平均值?
- visual-studio - 创建一个 Nuget 包,不将其依赖项添加为引用
- java - 我应该如何描述这种创建随机对象的方法?
- asp.net-core - 令牌服务器上自定义端点的 Identity Server 4 客户端凭据