首页 > 解决方案 > kafka-go 库 - 从最新民意调查中获取所有记录作为消息片段

问题描述

我正在使用segmentio/kafka-go库,特别是 kafka.Reader,但它一次只能通过 FetchMessage 函数读取 1 条记录。我的设置目前看起来像这样:

dialer := &kafka.Dialer{
    Timeout:   10 * time.Second,
    DualStack: true,
    TLS:       buildTlsConfig(certPath, tlsEnabled),
}

// make a new kafka reader that is part of the given consumer group
r := kafka.NewReader(kafka.ReaderConfig{
    Brokers: []string{bootstrapBrokerAddress},
    GroupID: group,
    Topic:   topic,
    Dialer:  dialer,
})

// in a loop in a diff part of the code
m, err := kr.reader.FetchMessage(ctx)

为了提高业务逻辑的效率,我想将每次轮询迭代中的所有消息作为一个批次读取。我希望有一个像这样的简单功能:

func (r *Reader) FetchMessages(ctx context.Context) ([]Message, error)

或者至少是一种方法来检查是否从上次民意调查中获取了更多记录,这样我就可以自己收集它们,但我在 api 中看不到任何类似的东西。我知道 Conn 对象上有较低级别的函数,但我想利用读者的消费者组处理。

有没有一种简单的方法可以从每个投票中获取所有消息作为切片?

标签: goapache-kafka

解决方案


在您提供的同一链接上,有一个调用 ReadBatch 的示例

// to consume messages
topic := "my-topic"
partition := 0

conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
    log.Fatal("failed to dial leader:", err)
}

conn.SetReadDeadline(time.Now().Add(10*time.Second))
batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max

b := make([]byte, 10e3) // 10KB max per message
for {
    _, err := batch.Read(b)
    if err != nil {
        break
    }
    fmt.Println(string(b))
}

if err := batch.Close(); err != nil {
    log.Fatal("failed to close batch:", err)
}

if err := conn.Close(); err != nil {
    log.Fatal("failed to close connection:", err)
}

推荐阅读