首页 > 解决方案 > 如何使用 Segmentio 的 kafka-go 创建 Kafka 主题?

问题描述

我可以得到一个使用segmentio的kafka-go创建主题的例子吗?

我尝试创建如下主题:

c, _ := kafka.Dial("tcp", "host:port")
kt := kafka.TopicConfig{Topic: "sometopic", NumPartitions: 1, ReplicationFactor: 1}
e := c.CreateTopics(kt)

但这只有在给定的host:port是 Kafka Leader 时才有效。如果host:port不是 Kafka Leader,那么我将收到此错误:

不是控制器:这不是该集群的正确控制器*

传递集群地址以创建主题的正确方法是什么?

Kafka Segmentio:github.com/segmentio/kafka-go

标签: gokafka-topicsegment-io

解决方案


这就是你需要的:

func (c *Conn) Controller() (broker Broker, err error)
// Controller requests kafka for the current controller and returns its URL

当您使用 Dial 在代码中打开连接时,您会随机选择集群中的一个代理。因此,您可能/可能不会最终使用实际的 Kafka 控制器。对控制器进行简单查找并打开新连接应该会有所帮助。

https://pkg.go.dev/github.com/segmentio/kafka-go?tab=doc#Conn.Controller


推荐阅读