首页 > 解决方案 > 失败时重启 Kafka 消费者线程

问题描述

我有一个启动六个线程的应用程序。每个线程都包含一个 KafkaConsumer,从一个主题进行轮询并对记录进行一些处理。每个线程的主程序流程可以概括为

override fun run() {
        try {
            val pollDuration = appConfig.property("kafka.pollDurationSeconds").getString().toLong()

            while (true) {
                val records = consumer.poll(Duration.ofSeconds(pollDuration))
                performCustomAnalysis(records)
                addToCache(records)
                consumer.commitSync()
                if (shouldDoTimeBasedAnalysis()) {
                    timeBasedAnalysis()
                    cleanUpCache()
                }
            }
        } catch (e: Exception) {
            log.error("Unexpected event happened. e=$e", e)
        } finally {
            consumer.close()
            cache.close()
        }
    }

线程像这样从主应用程序启动

fun main(args: Array<String>): Unit = io.ktor.server.netty.EngineMain.main(args)

fun Application.module() {

   // Create consumers, etc...

   Thread1(
        topicId1,
        consumer1,
    ).start()

    Thread2(
        topicId2,
        consumer2,
    ).start()

    Thread3(
        topicId3,
        consumer3,
    ).start()

    Thread4(
        topicId4,
        consumer4,
    ).start()

    Thread5(
        topicId5,
        consumer5,
    ).start()

    Thread6(
        topicId6,
        consumer6,
    ).start()
}

不幸的是,线程偶尔会由于各种原因(超时、丢失连接等)而崩溃。我想知道是否有可能确保线程总是在失败时重新启动?Kotlin 中是否存在某种类型的看门狗?

标签: multithreadingkotlinexceptionapache-kafka

解决方案


推荐阅读