apache-kafka - 如何即时/启动时创建 Kafka 主题以供生产者发送?
问题描述
我开始使用适用于 Kafka 的 Confluent .NET 库,并尝试实现我用于 Azure 服务总线的模式,以便在生产者应用程序启动时创建主题(如果不存在则创建)。这将如何在 Kafka API 中完成,并且可以完成吗?
这将允许主题成为源代码控制的一部分并在自动发布过程中进行配置,而不是手动设置每个主题/环境。此外,我希望我的开发人员不必去每个 Kafka 实例/环境并首先配置它们以匹配。
如果我不能这样做,我将不得不在发布过程中将其烘焙到 bash 脚本中,但更喜欢在启动代码中使用它。
解决方案
您可以启用集群范围的配置auto.create.topics.enable。
如果新的生产者尝试向尚不存在的主题发送数据,这将自动创建一个主题。
但是,请注意以下事项:
- 将使用复制、分区数和保留的默认设置创建主题。确保根据需要更改这些默认设置。无论如何,所有自动创建的主题都将具有相同的配置。
- 生产者代码中主题名称配置中的拼写错误可能会导致不必要的主题创建。
或者,您可以使用 AdminClient API。此处显示了一个示例:
static async Task CreateTopicAsync(string bootstrapServers, string topicName) { using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build()) { try { await adminClient.CreateTopicsAsync(new TopicSpecification[] { new TopicSpecification { Name = topicName, ReplicationFactor = 1, NumPartitions = 1 } }); } catch (CreateTopicsException e) { Console.WriteLine($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}"); } } }
推荐阅读
- google-coral - 如何编写一个迭代一定次数的for循环:Coral
- python - LeetCode 问题的运行时差异原因(打开锁)
- r - 如何在 R 中计算简单数据的置信区间
- c# - 在 Android 中关闭应用程序后扫掉留下的通知时,Xamarin Forms 出现“应用程序已停止”错误
- python - 在图例旁边添加值并更改图例的位置
- npm - JFrog:如何删除一个 npm 包的版本?
- amazon-web-services - 如何将 AWS NLB 中的流量路由到 4 个实例中的 2 个,除非它们关闭
- swift - 保存子上下文会通知获取的结果控制器,但保存父上下文不会
- javascript - 在字符串中的 Y 索引处查找 X 字符并使用此信息过滤数组
- python - 查找python中最大的纪元时间戳