go - kafka-go 库 - 从最新民意调查中获取所有记录作为消息片段
问题描述
我正在使用segmentio/kafka-go库,特别是 kafka.Reader,但它一次只能通过 FetchMessage 函数读取 1 条记录。我的设置目前看起来像这样:
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: buildTlsConfig(certPath, tlsEnabled),
}
// make a new kafka reader that is part of the given consumer group
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{bootstrapBrokerAddress},
GroupID: group,
Topic: topic,
Dialer: dialer,
})
// in a loop in a diff part of the code
m, err := kr.reader.FetchMessage(ctx)
为了提高业务逻辑的效率,我想将每次轮询迭代中的所有消息作为一个批次读取。我希望有一个像这样的简单功能:
func (r *Reader) FetchMessages(ctx context.Context) ([]Message, error)
或者至少是一种方法来检查是否从上次民意调查中获取了更多记录,这样我就可以自己收集它们,但我在 api 中看不到任何类似的东西。我知道 Conn 对象上有较低级别的函数,但我想利用读者的消费者组处理。
有没有一种简单的方法可以从每个投票中获取所有消息作为切片?
解决方案
在您提供的同一链接上,有一个调用 ReadBatch 的示例
// to consume messages
topic := "my-topic"
partition := 0
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
log.Fatal("failed to dial leader:", err)
}
conn.SetReadDeadline(time.Now().Add(10*time.Second))
batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max
b := make([]byte, 10e3) // 10KB max per message
for {
_, err := batch.Read(b)
if err != nil {
break
}
fmt.Println(string(b))
}
if err := batch.Close(); err != nil {
log.Fatal("failed to close batch:", err)
}
if err := conn.Close(); err != nil {
log.Fatal("failed to close connection:", err)
}
推荐阅读
- python - 熊猫 str 分裂
- react-native - React-Native-Video -> 如何保存视频
- laravel - Laravel - 弹性搜索
- python - 无法刮取所有产品色板
- python-3.x - 如果 k,则在大小为 n 的列表的第 k 个位置添加元素的最佳方法
我知道可以通过重新定义列表并添加三个列表来在列表中添加一个元素,而不是作为第一个元素或最后一个元素:
# I want to add 5 into [1,2,3,4,6,7,8,9,0] between the 4 and the 6 A=[1,2,3,4,6,7,8,9,0] A=[1,2,3,4]+[5]+[6,7
- r - 根据条件与 R 中的其他数据集创建变量
- mysql - NodeJS xdevapi - 创建与 MySQL 数据库的连接
- javascript - 按内容选择消息(discord.js)
- c# - 为什么简单程序的bin输出中有这么多DLL
- java - 如何实现 java tic tac toe 游戏功能