首页 > 解决方案 > 用 kafka go 读取 __consumer_offsets

问题描述

我想阅读主题 __consumer_offsets 使用这个库:https ://github.com/segmentio/kafka-go

我的问题是,除非我指定一个分区,否则似乎什么都不会发生。默认情况下这个主题有100个分区,查询kafka的分区列表然后循环读取它们似乎是不合理的,我希望库中有一个预先存在的方法来读取来自所有分区的消息在话题中。

目前以下工作,在我用 kafkacat 验证 __consumer_offsets 主题的分区 15 中有消息后:

  r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:   []string{"kafka:9092"},
    Topic:     "__consumer_offsets",
    Partition: 15
  })
  r.SetOffset(0)

  for {
    m, err := r.ReadMessage(context.Background())
    if err != nil {
      log.Println("Error while trying to read message")
      log.Fatal(err)
      break
    }
    log.Printf("message at offset %d\n", m.Offset)
  }

  r.Close()

我想除非需要,否则分区选择应该在用户级别上是透明的。我错了吗?

无论消息位于哪个分区,有没有办法从主题中读取?或者改写,从所有分区中读取?

标签: goapache-kafka

解决方案


使用消费者组API,不需要给分区。

https://github.com/segmentio/kafka-go#consumer-groups

// GroupID holds the optional consumer group id.  If GroupID is specified, then
// Partition should NOT be specified e.g. 0
GroupID string


// Partition to read messages from.  Either Partition or GroupID may
// be assigned, but not both
Partition int

https://godoc.org/github.com/segmentio/kafka-go#ReaderConfig


推荐阅读