go - 如何使用 Segmentio 的 kafka-go 创建 Kafka 主题?
问题描述
我可以得到一个使用segmentio的kafka-go创建主题的例子吗?
我尝试创建如下主题:
c, _ := kafka.Dial("tcp", "host:port")
kt := kafka.TopicConfig{Topic: "sometopic", NumPartitions: 1, ReplicationFactor: 1}
e := c.CreateTopics(kt)
但这只有在给定的host:port是 Kafka Leader 时才有效。如果host:port不是 Kafka Leader,那么我将收到此错误:
不是控制器:这不是该集群的正确控制器*
传递集群地址以创建主题的正确方法是什么?
Kafka Segmentio:github.com/segmentio/kafka-go
解决方案
这就是你需要的:
func (c *Conn) Controller() (broker Broker, err error)
// Controller requests kafka for the current controller and returns its URL
当您使用 Dial 在代码中打开连接时,您会随机选择集群中的一个代理。因此,您可能/可能不会最终使用实际的 Kafka 控制器。对控制器进行简单查找并打开新连接应该会有所帮助。
https://pkg.go.dev/github.com/segmentio/kafka-go?tab=doc#Conn.Controller
推荐阅读
- excel - 如果三列完全匹配,则在 Excel 中使用 countifs 条件返回值
- r - 我无法在 Rstudio 中编织/编译 .R 文件
- javascript - 我正在尝试访问中间件路由器防护中的存储数据(getter)但返回 false
- flutter - 使用 CustomScrollView 包含 BackdropFilter ,为什么整个屏幕已经模糊了
- vb.net - 将图标放在任务栏的中心?
- reactjs - 通过回调显示dash数据表
- go - Go protobuf 包冲突
- java - 覆盖 Tomca7 Maven 插件配置
- flutter - 在 GlobalKey 上调用 popUntil 时返回 null 的路由
- sql - Postgres SQL 状态:22P02 - 整数的无效输入语法