go - 重新订阅 Kafka-topic 并仅获取新消息
问题描述
我正在构建一个应用程序,我需要在其中即时订阅和取消订阅 Kafka 主题。问题是我找不到重新订阅主题的方法,因此我只能收到订阅后出现的该主题的新消息。
设置时,"auto.offset.reset": "latest"
我只会在第一次订阅时收到新消息,而在以后的订阅中不会收到。
每次我需要订阅一个新主题时,我是否应该创建一个新的消费者组?
更新:
我尝试像这样设置消费者,这是正确的方法,但问题是我已经用我的 groupId 提交了偏移量。通过更改 groupId 解决了问题。
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": os.Getenv("KAFKA_BOOTSTRAP_SERVERS"),
"group.id": "foobar",
"enable.auto.commit": false,
"auto.offset.reset": "latest",
})
解决方案
这是使用消费者组偏移跟踪时需要考虑的关键部分
每天听某些主题几次,最多几分钟...对这些时期之间发生的消息不感兴趣
要始终让您的消费者“关注”主题,请enable.auto.commit=false
与 一起设置auto.offset.reset=latest
,并且根本不提交偏移量
否则,seekToEnd()
是你想要的消费者方法
推荐阅读
- javascript - What technologies are used in the editor of Canva
- python - 训练问题,Val 损失和准确率不变
- airflow - 如何计算 Airflow DAG 的总执行时间
- ethereum - web3.eth.getAccounts(); 当前不工作
- javascript - 使用 Ruby 与 Cytoscape.js API 交互
- python - 我需要删除python中大于1的列表元素
- java - 如何在Java中分配内部类数组?
- bash - 在bash中自动输入初始输入后等待用户输入
- java - 从 Jdeveloper 加载 Jasper 报告
- css - 无法在 React 中导入 fontello 生成的 css