kotlin - Kotlin 协程 isActive 即使协程没有运行也为真
问题描述
我有一个工作人员从两个 AWS SQS 队列中消费消息,这个工作人员是用 kotlin 编写的,并且有 1 个协程来消费队列消息,这个消费的消息被插入到一个通道中,并且每个队列有 10 个协程从这个通道消费。
队列 1 - 1 个消费消息的协程,10 个处理消息的
协程 队列 2 - 1 个消费消息的协程,10 个处理消息的协程
这个工人还有一个“协程监视器”,每个协程被插入到一个列表中,并在 kubernetes 健康检查中检查协程状态。
由于某种原因,协程停止了,但协程的状态仍然处于活动状态,并且健康检查仍然没有重新启动 pod
有没有人有任何建议来解决这个问题?
代码与本文基本一致:https ://jivimberg.io/blog/2019/02/23/sqs-consumer-using-kotlin-coroutines/
健康检查码
class CoroutineHealth {
private lateinit var jobs: List<Job>
private var numberOfWorkers: Long = 0
companion object {
val logger = KotlinLogging.logger("com.ifood.cx.config.health.CoroutineHealth")
}
fun updateJobsNumber(jobs: List<Job>, numberOfWorkers: Long) {
if (jobs.size.toLong() != numberOfWorkers) {
throw IllegalArgumentException("Invalid queue consumer health indicator configuration, the number of jobs is different than number of workers.");
}
this.jobs = jobs
this.numberOfWorkers = numberOfWorkers
}
fun health(): Boolean {
val numberOfActiveJobs = jobs.stream().filter { job -> job.isActive }.count()
logger.info{"expected: $numberOfWorkers, active jobs: $numberOfActiveJobs"};
if (numberOfWorkers == numberOfActiveJobs) {
return true;
}
logger.error("Health check failed because we have less jobs than expected. expected: $numberOfWorkers, active jobs: $numberOfActiveJobs");
return false;
}
}
解决方案
推荐阅读
- unity3d - 加载我的场景时发生未知错误。统一
- automation - 如何使用 VBS 捕获命令窗口的输出并通过播放的声音通知用户?
- preview - 使用 azure Form Recognizer api v2.1 通过 REST 训练模型时提供自定义模型名称
- mysql - MySql查询不同的条件
- php - 从 MySQL 填充数据表中的数据:Php Ajax
- linux - 如何更新 Argo CLI?
- javascript - e.preventDefault() 在 100% 的情况下不起作用
- pyspark - 如何在 json 文件可能具有不同数据大小的 spark 中正确读取 dynamodb json 数据(dynamodb 从 s3 导出的数据)
- python-3.x - 使用 k-means 和 Word2Vec 预测新数据点的集群
- c++ - 未找到入口点 - VS2019 OpenCV C++