go - 用 kafka go 读取 __consumer_offsets
问题描述
我想阅读主题 __consumer_offsets 使用这个库:https ://github.com/segmentio/kafka-go
我的问题是,除非我指定一个分区,否则似乎什么都不会发生。默认情况下这个主题有100个分区,查询kafka的分区列表然后循环读取它们似乎是不合理的,我希望库中有一个预先存在的方法来读取来自所有分区的消息在话题中。
目前以下工作,在我用 kafkacat 验证 __consumer_offsets 主题的分区 15 中有消息后:
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"kafka:9092"},
Topic: "__consumer_offsets",
Partition: 15
})
r.SetOffset(0)
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
log.Println("Error while trying to read message")
log.Fatal(err)
break
}
log.Printf("message at offset %d\n", m.Offset)
}
r.Close()
我想除非需要,否则分区选择应该在用户级别上是透明的。我错了吗?
无论消息位于哪个分区,有没有办法从主题中读取?或者改写,从所有分区中读取?
解决方案
使用消费者组API,不需要给分区。
https://github.com/segmentio/kafka-go#consumer-groups
// GroupID holds the optional consumer group id. If GroupID is specified, then // Partition should NOT be specified e.g. 0 GroupID string // Partition to read messages from. Either Partition or GroupID may // be assigned, but not both Partition int
https://godoc.org/github.com/segmentio/kafka-go#ReaderConfig
推荐阅读
- c# - 为什么这个具体的泛型类型不能被履行相同契约的具体类型所取代?
- javascript - 反应数组映射中的设置值不起作用
- google-apps-script - 在 Google App Script 中使用 DriveApp 永久删除 Google Drive 文件
- server - 如何通过 FileZilla 访问 Google Kubernetes Engine FTP 服务器
- c# - 在团队源代码管理中处理实体框架代码优先迁移
- multilingual - Kentico 11 不一致的文化选择和重定向
- python - 打印单个拆分字符串
- javascript - 如何在乐队上创建带有日期的范围滚动条
- android - Firebase Crashlytics 与 Firebase-core gradle 构建与 ProjectConfigurationException
- python - Tkinter IntVar 不工作