multithreading - 失败时重启 Kafka 消费者线程
问题描述
我有一个启动六个线程的应用程序。每个线程都包含一个 KafkaConsumer,从一个主题进行轮询并对记录进行一些处理。每个线程的主程序流程可以概括为
override fun run() {
try {
val pollDuration = appConfig.property("kafka.pollDurationSeconds").getString().toLong()
while (true) {
val records = consumer.poll(Duration.ofSeconds(pollDuration))
performCustomAnalysis(records)
addToCache(records)
consumer.commitSync()
if (shouldDoTimeBasedAnalysis()) {
timeBasedAnalysis()
cleanUpCache()
}
}
} catch (e: Exception) {
log.error("Unexpected event happened. e=$e", e)
} finally {
consumer.close()
cache.close()
}
}
线程像这样从主应用程序启动
fun main(args: Array<String>): Unit = io.ktor.server.netty.EngineMain.main(args)
fun Application.module() {
// Create consumers, etc...
Thread1(
topicId1,
consumer1,
).start()
Thread2(
topicId2,
consumer2,
).start()
Thread3(
topicId3,
consumer3,
).start()
Thread4(
topicId4,
consumer4,
).start()
Thread5(
topicId5,
consumer5,
).start()
Thread6(
topicId6,
consumer6,
).start()
}
不幸的是,线程偶尔会由于各种原因(超时、丢失连接等)而崩溃。我想知道是否有可能确保线程总是在失败时重新启动?Kotlin 中是否存在某种类型的看门狗?
解决方案
推荐阅读
- python - 不显示缺失值matplotlib
- python - pyenv 在 macos 上给出 shopt 命令未找到错误
- python - 'InstrumentedList' 对象在遍历关系时没有属性 'Location'
- scala - 在 spark 和 scala 中旋转数据框
- mule - 运行 Munit 时出现错误“org.mule.runtime.api.exception.DefaultMuleException: Error while parsing script: %dw”
- python - Python从文件中读取URL直到最后一行
- javascript - 多个图像选择在 React Native 中不起作用
- html - 在一行中对齐 href 按钮和图像按钮
- python - 打印 python 美丽的汤对象时出现 Unicode 错误
- c# - Xamarin Forms FlyOut 菜单不显示在 ContentViews 上,除非通过菜单访问