spring-boot - 在修改主题时重置 Spring Boot Kafka Stream Application
问题描述
我正在使用.spring-kafka
在 Spring Boot 应用程序中运行 Kafka Stream StreamsBuilderFactoryBean
。我通过删除和重新创建将某些主题中的分区数从 100 更改为 20,但现在在运行应用程序时,出现以下错误:
Existing internal topic MyAppId-KSTREAM-AGGREGATE-STATE-STORE-0000000092-changelog has invalid partitions: expected: 20; actual: 100. Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics before processing.
我无法访问课程kafka.tools.StreamsResetter
并尝试调用StreamsBuilderFactoryBean.getKafkaStreams.cleanup()
,但它给出了NullPointerException
. 我该如何进行上述清理?
解决方案
相关文档在这里。
第 1 步:本地清理
对于带有StreamsBuilderFactoryBean
的 Spring Boot,第一步可以通过简单地添加CleanerConfig
到构造函数来完成:
// Before
new StreamsBuilderFactoryBean(new KafkaStreamsConfiguration(config));
// After
new StreamsBuilderFactoryBean(new KafkaStreamsConfiguration(config), new CleanupConfig(true, true));
KafkaStreams.cleanUp()
这可以在 beforestart()
和 after上调用该方法stop()
。
第 2 步:全局清理
对于第二步,在应用程序的所有实例停止的情况下,只需按照文档中的说明使用该工具:
# In kafka directory
bin/kafka-streams-application-reset.sh --application-id "MyAppId" --bootstrap-servers 1.2.3.4:9092 --input-topics x --intermediate-topics first_x,second_x,third_x --zookeeper 1.2.3.4:2181
这是做什么的:
对于任何指定的输入主题:将应用程序的已提交消费者偏移量重置为所有分区的“主题开始”(对于消费者组 application.id)。
对于任何指定的中间主题:跳到主题的末尾,即将所有分区的应用程序提交的消费者偏移量设置为每个分区的logSize(对于消费者组application.id)。
对于任何内部主题:删除内部主题(这也将删除已提交的相应已提交偏移量)。
推荐阅读
- python - 如何将 while 循环与“try”和“except”一起使用?
- java - 如何将 List 对象的值设置为另一个 List 对象?
- c++ - async_write 连续两次出现问题
- java - 假装客户端内部服务器异常(其余模板也不起作用)
- terraform - Terraform 重命名状态文件
- java - 如何将字符串数组 [] 元素转换/添加到列表中
[] 大批 - javascript - materialize-css 选择样式 display:none 应用于 ckeditor 对话
- javascript - npm 错误!缺少脚本:构建;
- c# - WPF ListView 绑定更新选定索引
- r - 无法从链接导入表(期权链):https://www.nseindia.com/option-chain