kotlin - 在 OpenShift 中部署的容器中运行多个协程
问题描述
我将以下 Kotlin 代码部署为 OpenShift 中的容器:
fun main(args: Array<String>): Unit = io.ktor.server.netty.EngineMain.main(args)
@kotlin.jvm.JvmOverloads
fun Application.module() {
launch { consumeProductionGeneratingUnits1hTopic() }
launch { consumeProductionLargeGeneratingUnits1hTopic() }
launch { consumeProductionAggregateProdType1hTopic() }
}
每个协程都只是在无限循环中从 kafka 主题中消费:
fun runCoroutine() {
val consumer = buildConsumer("topic")
while (true){
val record = consumer.poll(Duration.ofSeconds(30))
println(record.toString())
}
}
当我在本地运行此代码时,所有三个协程都已启动。但是,当我在 OpenShift 中将代码作为容器部署和运行时,只启动了前两个协程。看起来 OpenShift 最多支持两个协程。
有没有人经历过类似的事情?我试图为正在运行的 pod 保留更多的 cpu,但这不会影响协程的行为方式。
解决方案
首先要注意的是你没有使用挂起函数,所以这些协程只会用它们的while(true)
循环永远阻塞它们运行的线程。协程被设计为协作的,因此您需要暂停点以允许线程切换。
在当前的实现中,如果你在一个只有 2 个线程的线程池上调度,前 2 个协程将阻塞它们,而第三个协程将永远不会运行。一些协程调度程序使用的线程数量取决于可用内核的数量,这可以解释本地机器(可能超过 2 个内核)和容器(可能是 2 个内核)之间的行为差异。
我无法判断您是否在具有 2 个以上线程的线程池上分派这些协程,因为您没有显示launch
它们所在的协程范围(您的代码不应该按原样编译,除非您使用的是非常launch
没有接收器的顶级协程的旧版本CoroutineScope
?)。
解决方案
当然,您可以为您的 pod 分配更多核心,但这只是推动问题。
另一种选择是使用具有更多线程的线程池,但这也只是推动问题。
一个正确的修复 IMO 将实际使用转换为suspend
函数的异步 API。但是一个更简单(快速)的解决方法是保持您的代码原样,但只需在循环中添加对yield()的调用,以确保不时释放线程以供其他协程使用:
suspend fun runCoroutine() {
val consumer = buildConsumer("topic")
while (true){
val record = consumer.poll(Duration.ofSeconds(30))
println(record.toString())
yield() // ensures we suspend to free the thread
}
}
推荐阅读
- graphviz - 有没有办法使用graphviz中的fdp布局在同一集群中的节点之间与不同集群中的节点之间具有不同的边样条?
- excel - 双循环
- c# - 属性更改不会从任务内部更新 UI
- deep-learning - 用于二进制分类的最终线性层中的输出数量
- django - Django中的一对一关系
- python - 无法在ubuntu环境下安装autotest
- python - 如何使用数据结构找到具有给定眼睛颜色的人?
- python - PySpark:如果单行则将行合并到其父行,否则添加行是子行
- c++ - 指向 C++ 中未定义函数和参数的指针
- node.js - 使用 vue 和 axios 将图像发送到后端