首页 > 解决方案 > 在 OpenShift 中部署的容器中运行多个协程

问题描述

我将以下 Kotlin 代码部署为 OpenShift 中的容器:

fun main(args: Array<String>): Unit = io.ktor.server.netty.EngineMain.main(args)

@kotlin.jvm.JvmOverloads
fun Application.module() {

    launch { consumeProductionGeneratingUnits1hTopic() }
    launch { consumeProductionLargeGeneratingUnits1hTopic() }
    launch { consumeProductionAggregateProdType1hTopic() }

}

每个协程都只是在无限循环中从 kafka 主题中消费:

fun runCoroutine() {
    val consumer = buildConsumer("topic")
    while (true){
        val record = consumer.poll(Duration.ofSeconds(30))
        println(record.toString())
    }
}

当我在本地运行此代码时,所有三个协程都已启动。但是,当我在 OpenShift 中将代码作为容器部署和运行时,只启动了前两个协程。看起来 OpenShift 最多支持两个协程。

有没有人经历过类似的事情?我试图为正在运行的 pod 保留更多的 cpu,但这不会影响协程的行为方式。

标签: kotlinopenshiftkotlin-coroutinesktor

解决方案


首先要注意的是你没有使用挂起函数,所以这些协程只会用它们的while(true)循环永远阻塞它们运行的​​线程。协程被设计为协作的,因此您需要暂停点以允许线程切换。

在当前的实现中,如果你在一个只有 2 个线程的线程池上调度,前 2 个协程将阻塞它们,而第三个协程将永远不会运行。一些协程调度程序使用的线程数量取决于可用内核的数量,这可以解释本地机器(可能超过 2 个内核)和容器(可能是 2 个内核)之间的行为差​​异。

我无法判断您是否在具有 2 个以上线程的线程池上分派这些协程,因为您没有显示launch它们所在的协程范围(您的代码不应该按原样编译,除非您使用的是非常launch没有接收器的顶级协程的旧版本CoroutineScope?)。

解决方案

当然,您可以为您的 pod 分配更多核心,但这只是推动问题。

另一种选择是使用具有更多线程的线程池,但这也只是推动问题。

一个正确的修复 IMO 将实际使用转换为suspend函数的异步 API。但是一个更简单(快速)的解决方法是保持您的代码原样,但只需在循环中添加对yield()的调用,以确保不时释放线程以供其他协程使用:

suspend fun runCoroutine() {
    val consumer = buildConsumer("topic")
    while (true){
        val record = consumer.poll(Duration.ofSeconds(30))
        println(record.toString())
        yield() // ensures we suspend to free the thread
    }
}

推荐阅读