java - Spring kafka 在运行时重新创建 Kafka Stream Topology
问题描述
我有一个基于 spring boot、spring-kafka 和 kafka-streams 的应用程序。当应用程序启动时,它会创建带有默认主题列表的 kafka 流拓扑。我需要做的是在运行时编辑/重新创建拓扑。例如,当应用程序已经运行时,出现了新的主题名称,我想将此主题添加到我的拓扑中。目前,我正在考虑以某种方式删除现有拓扑,关闭并清理 KafkaStreams,在我创建拓扑但使用新主题名称的地方运行逻辑,然后再次启动 KafkaStreams。我不想重新启动我的应用程序。有人可以建议我如何在运行时执行此操作吗?
解决方案
我找到了 1 个解决方案。我扩展了 StreamsBuilderFactoryBean:
@Bean(name = DEFAULT_STREAMS_BUILDER_BEAN_NAME)
@Primary
public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(KafkaStreamsConfiguration kStreamsConfigs) {
return new DynamicStreamsBuilderFactoryBean(kStreamsConfigs);
}
public static class DynamicStreamsBuilderFactoryBean extends StreamsBuilderFactoryBean {
private StreamsBuilder instance;
public DynamicStreamsBuilderFactoryBean(final KafkaStreamsConfiguration streamsConfig) {
super(streamsConfig);
}
@Override
public boolean isSingleton() {
return false;
}
@Override
protected synchronized StreamsBuilder createInstance() {
if (instance == null) {
instance = new StreamsBuilder();
}
return instance;
}
@Override
public synchronized void stop() {
instance = null;
super.stop();
}
}
当我构建拓扑时,我不使用 StreamsBuilder,而是使用 StreamsBuilderFactoryBean#getObject():
@Component
公共类动态流{
private final StreamsBuilderFactoryBean streamsBuilderFactoryBean;
public void init() {
StreamsBuilder builder = streamsBuilderFactoryBean.getObject();
//build topology
}
//call this method when stream reconfiguration is needed
public void reinitialize() {
streamsBuilderFactoryBean.stop();
init();
streamsBuilderFactoryBean.start();
}
}
推荐阅读
- java - 如果我们在使用 JAXB 时有未初始化的最终字段,如何引入无参数构造函数?
- android - 在 Flutter App 上构建失败“task assembleDebug failed with exit code 1”
- c++ - 如何从向量中获取最小值和最大值
2D图像的集合opencv c ++ - assembly - 如何使用 PIN 获取“lea”指令的操作数?
- spring-boot - 当我发布时,即使我声明了@RequestMapping,它也会显示在 Spring Boot 中找不到 404 错误
- c++ - 为什么即使没有错误,我的 UDP 连接也不会发送消息?
- python - 高效查询存储为 pickle 文件的两个大字典
- python-3.x - 当有点或逗号时如何在 PEP8 中使用反斜杠?
- pandas - Pandas 数据框转储到 excel 与颜色格式
- android - 有没有办法通过 Android 应用程序中的 StringRequest URL 访问 AWS RDS 数据库?