首页 > 解决方案 > 如何即时/启动时创建 Kafka 主题以供生产者发送?

问题描述

我开始使用适用于 Kafka 的 Confluent .NET 库,并尝试实现我用于 Azure 服务总线的模式,以便在生产者应用程序启动时创建主题(如果不存在则创建)。这将如何在 Kafka API 中完成,并且可以完成吗?

这将允许主题成为源代码控制的一部分并在自动发布过程中进行配置,而不是手动设置每个主题/环境。此外,我希望我的开发人员不必去每个 Kafka 实例/环境并首先配置它们以匹配。

如果我不能这样做,我将不得不在发布过程中将其烘焙到 bash 脚本中,但更喜欢在启动代码中使用它。

标签: apache-kafkakafka-producer-apiconfluent-kafka-dotnet

解决方案


您可以启用集群范围的配置auto.create.topics.enable

如果新的生产者尝试向尚不存在的主题发送数据,这将自动创建一个主题。

但是,请注意以下事项:

  • 将使用复制、分区数和保留的默认设置创建主题。确保根据需要更改这些默认设置。无论如何,所有自动创建的主题都将具有相同的配置。
  • 生产者代码中主题名称配置中的拼写错误可能会导致不必要的主题创建。

或者,您可以使用 AdminClient API。此处显示了一个示例:

static async Task CreateTopicAsync(string bootstrapServers, string topicName) { using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build()) { try { await adminClient.CreateTopicsAsync(new TopicSpecification[] { new TopicSpecification { Name = topicName, ReplicationFactor = 1, NumPartitions = 1 } }); } catch (CreateTopicsException e) { Console.WriteLine($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}"); } } }

推荐阅读