首页 > 解决方案 > Kotlin 协程 isActive 即使协程没有运行也为真

问题描述

我有一个工作人员从两个 AWS SQS 队列中消费消息,这个工作人员是用 kotlin 编写的,并且有 1 个协程来消费队列消息,这个消费的消息被插入到一个通道中,并且每个队列有 10 个协程从这个通道消费。

队列 1 - 1 个消费消息的协程,10 个处理消息的
协程 队列 2 - 1 个消费消息的协程,10 个处理消息的协程

这个工人还有一个“协程监视器”,每个协程被插入到一个列表中,并在 kubernetes 健康检查中检查协程状态。

由于某种原因,协程停止了,但协程的状态仍然处于活动状态,并且健康检查仍然没有重新启动 pod

有没有人有任何建议来解决这个问题?

代码与本文基本一致:https ://jivimberg.io/blog/2019/02/23/sqs-consumer-using-kotlin-coroutines/

健康检查码

class CoroutineHealth {

    private lateinit var jobs: List<Job>
    private var numberOfWorkers: Long = 0
    companion object {
        val logger = KotlinLogging.logger("com.ifood.cx.config.health.CoroutineHealth")
    }


    fun updateJobsNumber(jobs: List<Job>, numberOfWorkers: Long) {
        if (jobs.size.toLong() != numberOfWorkers) {
            throw  IllegalArgumentException("Invalid queue consumer health indicator configuration, the number of jobs is different than number of workers.");
        }
        this.jobs = jobs
        this.numberOfWorkers = numberOfWorkers
    }

    fun health(): Boolean {
        val numberOfActiveJobs = jobs.stream().filter { job -> job.isActive }.count()
        logger.info{"expected: $numberOfWorkers, active jobs: $numberOfActiveJobs"};
        if (numberOfWorkers == numberOfActiveJobs) {
            return true;
        }
        logger.error("Health check failed because we have less jobs than expected. expected: $numberOfWorkers, active jobs: $numberOfActiveJobs");
        return false;
    }


}

标签: kotlinkotlin-coroutines

解决方案


推荐阅读