go - 如何确保我的消费者只按顺序处理 kafka 主题中的消息一次?
问题描述
我以前从未使用过卡夫卡。我有两个访问本地 kafka 实例的测试 Go 程序:一个读取器和一个写入器。我正在尝试调整我的生产者、消费者和 kafka 服务器设置以获得特定行为。
我的作家:
package main
import (
"fmt"
"math/rand"
"strconv"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
rand.Seed(time.Now().UnixNano())
topics := []string{
"policymanager-100",
"policymanager-200",
"policymanager-300",
}
progress := make(map[string]int)
for _, t := range topics {
progress[t] = 0
}
producer, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "localhost",
"group.id": "0",
})
if err != nil {
panic(err)
}
defer producer.Close()
fmt.Println("producing messages...")
for i := 0; i < 30; i++ {
index := rand.Intn(len(topics))
topic := topics[index]
num := progress[topic]
num++
fmt.Printf("%s => %d\n", topic, num)
msg := &kafka.Message{
Value: []byte(strconv.Itoa(num)),
TopicPartition: kafka.TopicPartition{
Topic: &topic,
},
}
err = producer.Produce(msg, nil)
if err != nil {
panic(err)
}
progress[topic] = num
time.Sleep(time.Millisecond * 100)
}
fmt.Println("DONE")
}
我的本地 kafka 上存在三个主题:policymanager-100、policymanager-200、policymanager-300。它们每个只有 1 个分区,以确保所有消息都按 kafka 接收它们的时间排序。我的作者将随机选择其中一个主题并发出一条消息,其中包含一个仅针对该主题递增的数字。当它完成运行时,我希望队列看起来像这样(为了便于阅读而缩短了主题名称):
100: 1 2 3 4 5 6 7 8 9 10 11
200: 1 2 3 4 5 6 7
300: 1 2 3 4 5 6 7 8 9 10 11 12
到目前为止,一切都很好。我正在尝试进行配置,以便可以启动任意数量的消费者并按顺序使用这些消息。“按顺序”是指在消息 1 完成(而不是刚刚开始)之前,任何消费者都不应该获得主题 100 的消息 2。如果正在处理主题 100 的消息 1,则消费者可以自由地从当前没有正在处理的消息的其他主题中消费。如果某个主题的消息已发送给消费者,则整个主题应“锁定”,直到超时假定消费者失败或消费者提交消息,然后该主题被“解锁”以使其下一条消息可供消费。
我的读者:
package main
import (
"fmt"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
count := 2
for i := 0; i < count; i++ {
go consumer(i + 1)
}
fmt.Println("cosuming...")
// hold this thread open indefinitely
select {}
}
func consumer(id int) {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost",
"group.id": "0", // strconv.Itoa(id),
"enable.auto.commit": "false",
})
if err != nil {
panic(err)
}
c.SubscribeTopics([]string{`^policymanager-.+$`}, nil)
for {
msg, err := c.ReadMessage(-1)
if err != nil {
panic(err)
}
fmt.Printf("%d) Message on %s: %s\n", id, msg.TopicPartition, string(msg.Value))
time.Sleep(time.Second)
_, err = c.CommitMessage(msg)
if err != nil {
fmt.Printf("ERROR commiting: %+v\n", err)
}
}
}
根据我目前的理解,我可能实现这一目标的方式是正确设置我的消费者。我已经尝试了这个程序的许多不同的变体。我试过让我所有的 goroutine 共享同一个消费者。我尝试group.id
为每个 goroutine 使用不同的。这些都不是获得我所追求的行为的正确配置。
发布的代码的作用是一次清空一个主题。尽管有多个 goroutine,但该进程将读取所有 100,然后移动到 200 再到 300,并且实际上只有一个 goroutine 会完成所有读取。当我让每个 goroutine 有一个不同的group.id
然后消息被多个 goroutines 读取时,我想阻止。
我的示例消费者只是简单地用 goroutines 分解事物,但是当我开始将这个项目用于我的工作用例时,我需要它来跨多个不会相互通信的 kubernetes 实例工作,因此使用任何交互的东西一旦 2 个 kube 上有 2 个实例,goroutines 就不会工作。这就是为什么我希望让卡夫卡做我想要的把关。
解决方案
一般来说,你不能。即使您有一个消费者消费了该主题的所有分区,分区也会以不确定的顺序被消费,并且无法保证您在所有分区中的总排序。
试试 Keyed Messages,认为您可能会发现这对您的用例很有用。
推荐阅读
- reactjs - Why is there no error when requesting interface data?
- xamarin - Remove underline from Entry Control in Xamarin Forms
- azure - Executing code in azure example getting error as node already exists
- javascript - 箭头函数在反应组件中变成“普通”函数表达式
- android - why can't i run my app? why is it showing the message below?
- mongodb - MogoDB aggregate with $count and $lookup expected performance
- artillery - 炮兵:ramto 在炮兵中究竟是如何工作的?
- hive - 为什么从 Presto 中的 dtap:// hive 表读取时总是得到 0 条记录作为输出?
- api - Symfony 4 - API 管理多个文件上传(无捆绑)
- spring - 字段 restTemplate 需要一个 bean,但找到了 2 个