首页 > 解决方案 > Camel - RoutePolicy - ThrottlingExceptionRoutePolicy 不适用于多个消费者

问题描述

在我们的应用程序中,我们使用 kafka 组件在手动提交模式下使用 kafka 主题的消息。如果我们在后续过程中遇到异常(外部服务调用失败 - 抛出 RetryableException),我们已经连接了 ThrottlingExceptionRoutePolicy 以打开电路并基于 HalfOpenHandler 重试电路关闭,遇到了 RetryableException。

ThrottlingExceptionRoutePolicy { throttledException - RetryableException,failureThreshold - 1,failureWindow = 1 mins halfOpenAfter - x mins keepOpen = false }

from("kafka:topicName").routePolicy(exceptionPolicy).to(anotherProcess);

当我们每个 pod 运行单个消费者时,电路打开/关闭按预期工作。但是在有多个消费者的情况下,电路是打开的,但不会尝试关闭电路。

查看 ThrottlingExceptionRoutePolicy 代码:

第一个和第二个消费者线程都在等待获取锁。

  1. 线程 T1 获得锁,线程 T2 正在等待。

  2. 消费者停止,状态为 OPEN,policy.openedAt - t1,halfOpenTask(h1) 计划在 x(阈值)ms 后运行,policy.halfOpenTimer => h1

  3. 锁被释放

  4. 线程 T2 获取锁。

  5. 消费者已经停止,状态仍然是 OPEN,policy.openedAt = t2 (t2 > t1),halfOpenTask (h2 - new instance) 计划在 x 毫秒后运行。policy.halfOpenTimer => h2。

  6. 一直以来,h1 计时器都在倒计时。一旦计时器完成,HalfOpen 任务就会排队并启动。

  7. 在 HalfOpen 任务的运行方法中 ->
    7.a 取消 policy.halfOpenTimer (h2) 7.b 所以 h2 半开计划任务被取消 7.c 调用 this.calculateState()

            7.c.1  check the state is OPEN and if elapsedTime >= threshold X  , then call half-open handler
                   but the  elapsedTime is currentTs (t1+x) - openedAt(t2) so the condition is never 
                   true. So the half-open handler is never called. 
    

这可能发生在并发消费者 > 1 的任何骆驼路线上。

有没有人遇到过类似的问题?

标签: apache-kafkaapache-camel

解决方案


它发生在一个 concurrentConsumers=1 以及 concurrencyLimit: 50 显然计时器永远不会关闭电路的情况下。这是记录的最后一条语句。2021-08-04 15:56:59.473 调试 99845 --- [askExecutor-104] oactThrottlingExceptionRoutePolicy:打开电路...

此外,ThrottlingExceptionRoutePolicy.failures 只是在 failureWindow 之外保持递增。

    protected boolean isThresholdExceeded() {
    boolean output = false;
    this.logState();
    if (this.failures.get() >= this.failureThreshold && this.lastFailure >= System.currentTimeMillis() - this.failureWindow) {
        output = true;
    }

    return output;
}

只要失败> = failureThreshold(将在某个时候)并且lastFailure发生在failureWindow内,“输出”就会为真。除非调用 closeCircuit,否则不会设置失败。那是一个错误。


推荐阅读