apache-kafka - Camel - RoutePolicy - ThrottlingExceptionRoutePolicy 不适用于多个消费者
问题描述
在我们的应用程序中,我们使用 kafka 组件在手动提交模式下使用 kafka 主题的消息。如果我们在后续过程中遇到异常(外部服务调用失败 - 抛出 RetryableException),我们已经连接了 ThrottlingExceptionRoutePolicy 以打开电路并基于 HalfOpenHandler 重试电路关闭,遇到了 RetryableException。
ThrottlingExceptionRoutePolicy { throttledException - RetryableException,failureThreshold - 1,failureWindow = 1 mins halfOpenAfter - x mins keepOpen = false }
from("kafka:topicName").routePolicy(exceptionPolicy).to(anotherProcess);
当我们每个 pod 运行单个消费者时,电路打开/关闭按预期工作。但是在有多个消费者的情况下,电路是打开的,但不会尝试关闭电路。
查看 ThrottlingExceptionRoutePolicy 代码:
第一个和第二个消费者线程都在等待获取锁。
线程 T1 获得锁,线程 T2 正在等待。
消费者停止,状态为 OPEN,policy.openedAt - t1,halfOpenTask(h1) 计划在 x(阈值)ms 后运行,policy.halfOpenTimer => h1
锁被释放
线程 T2 获取锁。
消费者已经停止,状态仍然是 OPEN,policy.openedAt = t2 (t2 > t1),halfOpenTask (h2 - new instance) 计划在 x 毫秒后运行。policy.halfOpenTimer => h2。
一直以来,h1 计时器都在倒计时。一旦计时器完成,HalfOpen 任务就会排队并启动。
在 HalfOpen 任务的运行方法中 ->
7.a 取消 policy.halfOpenTimer (h2) 7.b 所以 h2 半开计划任务被取消 7.c 调用 this.calculateState()7.c.1 check the state is OPEN and if elapsedTime >= threshold X , then call half-open handler but the elapsedTime is currentTs (t1+x) - openedAt(t2) so the condition is never true. So the half-open handler is never called.
这可能发生在并发消费者 > 1 的任何骆驼路线上。
有没有人遇到过类似的问题?
解决方案
它发生在一个 concurrentConsumers=1 以及 concurrencyLimit: 50 显然计时器永远不会关闭电路的情况下。这是记录的最后一条语句。2021-08-04 15:56:59.473 调试 99845 --- [askExecutor-104] oactThrottlingExceptionRoutePolicy:打开电路...
此外,ThrottlingExceptionRoutePolicy.failures 只是在 failureWindow 之外保持递增。
protected boolean isThresholdExceeded() {
boolean output = false;
this.logState();
if (this.failures.get() >= this.failureThreshold && this.lastFailure >= System.currentTimeMillis() - this.failureWindow) {
output = true;
}
return output;
}
只要失败> = failureThreshold(将在某个时候)并且lastFailure发生在failureWindow内,“输出”就会为真。除非调用 closeCircuit,否则不会设置失败。那是一个错误。
推荐阅读
- aws-lambda - 如何在 API 网关和云端访问 lambda 中的主机?
- html - 如何创建相对于其他元素的响应式嵌套 SVG 线?
- azure - 如何在不授予整个订阅所有权的情况下部署 ARM 模板?
- python - 为什么在 except 子句中使用“或”不会导致 SyntaxError?它有有效的用途吗?
- android - 如何通过在android中将文本转换为pdf和pdf到docx来将文本转换为docx
- php - 随机函数范围从1> 10,生成所需的数字
- python - 在 macOS Catalina 中使用 python3 更改 python 语句的问题
- asp.net-core - 为什么我收到错误 AADSTS50011:请求中指定的回复 url 与为应用程序配置的回复 url 不匹配?
- c# - EF 核心不为 InMemoryDatabase 插入所有记录
- android - Android中的模糊imageView