spring - 如何在 spring-boot 应用程序启动期间创建许多 kafka 主题?
问题描述
我有这个配置:
@Configuration
public class KafkaTopicConfig {
private final TopicProperties topics;
public KafkaTopicConfig(TopicProperties topics) {
this.topics = topics;
}
@Bean
public NewTopic newTopicImportCharge() {
TopicProperties.Topic topic = topics.getTopicNameByType(MessageType.IMPORT_CHARGES.name());
return new NewTopic(topic.getTopicName(), topic.getNumPartitions(), topic.getReplicationFactor());
}
@Bean
public NewTopic newTopicImportPayment() {
TopicProperties.Topic topic = topics.getTopicNameByType(MessageType.IMPORT_PAYMENTS.name());
return new NewTopic(topic.getTopicName(), topic.getNumPartitions(), topic.getReplicationFactor());
}
@Bean
public NewTopic newTopicImportCatalog() {
TopicProperties.Topic topic = topics.getTopicNameByType(MessageType.IMPORT_CATALOGS.name());
return new NewTopic(topic.getTopicName(), topic.getNumPartitions(), topic.getReplicationFactor());
}
}
我可以将 10 个不同的主题添加到 TopicProperties
. 而且我不想手动创建每个类似的bean。在spring-kafka或仅spring中创建所有主题是否存在某种方式?
解决方案
直接使用管理客户端;您可以从 Boot 的KafkaAdmin
.
@SpringBootApplication
public class So55336461Application {
public static void main(String[] args) {
SpringApplication.run(So55336461Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaAdmin kafkaAdmin) {
return args -> {
AdminClient admin = AdminClient.create(kafkaAdmin.getConfigurationProperties());
List<NewTopic> topics = new ArrayList<>();
// build list
admin.createTopics(topics).all().get();
};
}
}
编辑
要检查它们是否已经存在,或者是否需要增加分区,KafkaAdmin
有这个逻辑......
private void addTopicsIfNeeded(AdminClient adminClient, Collection<NewTopic> topics) {
if (topics.size() > 0) {
Map<String, NewTopic> topicNameToTopic = new HashMap<>();
topics.forEach(t -> topicNameToTopic.compute(t.name(), (k, v) -> t));
DescribeTopicsResult topicInfo = adminClient
.describeTopics(topics.stream()
.map(NewTopic::name)
.collect(Collectors.toList()));
List<NewTopic> topicsToAdd = new ArrayList<>();
Map<String, NewPartitions> topicsToModify = checkPartitions(topicNameToTopic, topicInfo, topicsToAdd);
if (topicsToAdd.size() > 0) {
addTopics(adminClient, topicsToAdd);
}
if (topicsToModify.size() > 0) {
modifyTopics(adminClient, topicsToModify);
}
}
}
private Map<String, NewPartitions> checkPartitions(Map<String, NewTopic> topicNameToTopic,
DescribeTopicsResult topicInfo, List<NewTopic> topicsToAdd) {
Map<String, NewPartitions> topicsToModify = new HashMap<>();
topicInfo.values().forEach((n, f) -> {
NewTopic topic = topicNameToTopic.get(n);
try {
TopicDescription topicDescription = f.get(this.operationTimeout, TimeUnit.SECONDS);
if (topic.numPartitions() < topicDescription.partitions().size()) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info(String.format(
"Topic '%s' exists but has a different partition count: %d not %d", n,
topicDescription.partitions().size(), topic.numPartitions()));
}
}
else if (topic.numPartitions() > topicDescription.partitions().size()) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info(String.format(
"Topic '%s' exists but has a different partition count: %d not %d, increasing "
+ "if the broker supports it", n,
topicDescription.partitions().size(), topic.numPartitions()));
}
topicsToModify.put(n, NewPartitions.increaseTo(topic.numPartitions()));
}
}
catch (@SuppressWarnings("unused") InterruptedException e) {
Thread.currentThread().interrupt();
}
catch (TimeoutException e) {
throw new KafkaException("Timed out waiting to get existing topics", e);
}
catch (@SuppressWarnings("unused") ExecutionException e) {
topicsToAdd.add(topic);
}
});
return topicsToModify;
}
推荐阅读
- c# - 如何使全屏切换按钮开/关
- angular - 当我将引导程序放入styles.css Angular时如何删除警告?
- reactjs - 在对列进行排序后检查或选择行时,HTML 表会返回
- xamarin.forms - 如何在 xamarin 表单中为 NavigationPage 类或导航后退按钮设置或实现automationId 属性?
- bash - 我有一个大的多行文本文件,将行内容读入 bash 变量,直到出现特定字符(终止符)
- webpack - 使 webpack-dev-server 重用现有打开的选项卡
- python - 比较python中的两个excel文件
- amazon-s3 - 无法在 AWS S3 之上从 presto 创建架构/表
- python - 在python中添加多个但相同的关键字int值
- c++ - 为什么zcat在解压用GzipOutputStream压缩的文件时会抛出“无效压缩数据”?