java - 在 Spring Boot 中暂停/启动 Kafka 流处理器
问题描述
我将为消息实现断路器模式。基本要求是,如果微服务无法将消息发布到发布主题,它应该停止接受来自其他 Kafka 主题的消息。当发布主题可用时,微服务应该开始接受来自其他 Kafka 主题的消息。
有没有办法在 Spring boot Kafka Streams 中实现这一点?
解决方案
我能够通过使用BindingsEndpoint
.
private final BindingsEndpoint binding;
@Override
public void stop() {
List<?> objects = binding.queryStates();
if (!objects.isEmpty()) {
log.info("Stopping Kafka topics ");
List<Binding> bindings = getBindings(objects);
bindings.forEach(Binding::stop);
log.info("Stopped Kafka topics ");
}
}
@Override
public void start() {
List<?> objects = binding.queryStates();
if (!objects.isEmpty()) {
log.info("Starting Kafka topics ");
List<Binding> bindings = getBindings(objects);
bindings.forEach(Binding::start);
log.info("Started Kafka topics ");
}
}
protected List<Binding> getBindings(List<?> objects) {
return objects.stream().filter(object -> object instanceof Binding)
.map(object -> (Binding) object).collect(Collectors.toList());
}
推荐阅读
- c# - 使用 ModelStoreConatiner 名称作为 Schema 对特定表查询 db 的 EF 查询
- assembly - 我应该使用哪个汇编程序在 dosbox 中编译此代码
- javascript - 需要帮助在函数中更改 SVG 的颜色 - Skycons
- node.js - 在 QueryFile 中进行多次插入
- spring - Spring Batch 如何执行 Skip 或不考虑编写?
- express - 连接gridfs multer和mongodb时缺少db参数错误
- mysql - MySQL 基于表定义创建或更新
- c - 如何使用 FFMpeg 将图像转换为视频以用于嵌入式应用程序?
- c# - 防止新数据快照创建额外的孩子
- c++ - 我可以依靠编译器查找和优化简单的布尔循环不变量吗?