apache-kafka-streams - 当消费者在一个组内时恢复 kafka 流
问题描述
我的 Spring Cloud Stream 应用程序中有一个断路器。当电路改变状态并且我的流消费者是匿名的(不在组中)时,它可以很好地暂停/恢复流。
当消费者属于一个组时,暂停流效果很好,但恢复被“忽略”,最终以超时结束并离开组。任何解释为什么会发生这种不一致的行为?
Spring Cloud Stream 版本为 3.0.8.RELEASE。
这是我的断路器状态转换处理程序:
@Component
public class CircuitBreakerKafkaStream {
private final Logger log = LoggerFactory.getLogger(CircuitBreakerKafkaStream.class);
private List<InputBindingLifecycle> inputBindingLifecycles;
public CircuitBreakerKafkaStream(List<InputBindingLifecycle> inputBindingLifecycles) {
this.inputBindingLifecycles = inputBindingLifecycles;
}
@Override
// Pause or resume all of input bindings by the state of circuit breaker.
public void transitionHandler(CircuitBreaker.State toState) {
log.info("Circuit breaker is transitioning to {} state", toState.toString());
if (toState == CircuitBreaker.State.OPEN) {
log.info("Pausing kafka binder...");
gatherInputBindings().stream().forEach(binding -> binding.pause());
} else {
log.info("Resuming kafka binder...");
gatherInputBindings().stream().forEach(binding -> binding.resume());
}
}
private List<Binding<?>> gatherInputBindings() {
List<Binding<?>> inputBindings = new ArrayList<>();
for (InputBindingLifecycle inputBindingLifecycle : this.inputBindingLifecycles) {
Collection<Binding<?>> lifecycleInputBindings =
(Collection<Binding<?>>)
new DirectFieldAccessor(inputBindingLifecycle).getPropertyValue("inputBindings");
inputBindings.addAll(lifecycleInputBindings);
}
return inputBindings;
}
}
更新:我认为问题更具体。断路器具有open
、half_open
和close
状态。一旦电路关闭并恢复流,它会消耗一些消息,然后由于某种原因它会停止,直到发生轮询超时。
解决方案
您应该增加max.poll.timeout.ms
,默认设置为 5 分钟。如果流未在此限制内发送轮询请求,则 kafka 代理认为该流已死。
我们在 kafka 流中寻找延迟处理的解决方案,我们希望在几分钟内延迟消息传递并增加 max.poll.timeout.ms 以避免启动流。
推荐阅读
- python - 通过python中的列循环总结几列
- php - 我帐户中的 Woocommerce 订单查看页面显示 404 未找到
- flutter - 我在图像上添加了 Watermaek,但图像未加载
- typescript - 在 Vue 打字稿中使用 i18n
- image-manipulation - CodeIgniter 4 图像处理错误 withFile() null
- eloquent - Eloquent 如何使用 (NOT) EXISTS 和选择子查询
- at-command - GSM/GPRS 板 sim800c "AT+CTTS=?" 返回“错误”
- python - 具有多个输入的 TensorFlow 生成器读取多个文件会导致内核崩溃
- python - Python 套接字模块未检测到 TFTP RRQ
- python - Amazon Neptune 中批量加载的 CSV 数据格式