首页 > 解决方案 > 当消费者在一个组内时恢复 kafka 流

问题描述

我的 Spring Cloud Stream 应用程序中有一个断路器。当电路改变状态并且我的流消费者是匿名的(不在组中)时,它可以很好地暂停/恢复流。

当消费者属于一个组时,暂停流效果很好,但恢复被“忽略”,最终以超时结束并离开组。任何解释为什么会发生这种不一致的行为?

Spring Cloud Stream 版本为 3.0.8.RELEASE。

这是我的断路器状态转换处理程序:


@Component
public class CircuitBreakerKafkaStream {

  private final Logger log = LoggerFactory.getLogger(CircuitBreakerKafkaStream.class);

  private List<InputBindingLifecycle> inputBindingLifecycles;

  public CircuitBreakerKafkaStream(List<InputBindingLifecycle> inputBindingLifecycles) {
    this.inputBindingLifecycles = inputBindingLifecycles;
  }

  @Override
  // Pause or resume all of input bindings by the state of circuit breaker.
  public void transitionHandler(CircuitBreaker.State toState) {
    log.info("Circuit breaker is transitioning to {} state", toState.toString());

    if (toState == CircuitBreaker.State.OPEN) {
      log.info("Pausing kafka binder...");
      gatherInputBindings().stream().forEach(binding -> binding.pause());
    } else {
      log.info("Resuming kafka binder...");
      gatherInputBindings().stream().forEach(binding -> binding.resume());
    }
  }

  private List<Binding<?>> gatherInputBindings() {
    List<Binding<?>> inputBindings = new ArrayList<>();
    for (InputBindingLifecycle inputBindingLifecycle : this.inputBindingLifecycles) {
      Collection<Binding<?>> lifecycleInputBindings =
          (Collection<Binding<?>>)
              new DirectFieldAccessor(inputBindingLifecycle).getPropertyValue("inputBindings");
      inputBindings.addAll(lifecycleInputBindings);
    }
    return inputBindings;
  }
}

更新:我认为问题更具体。断路器具有openhalf_openclose状态。一旦电路关闭并恢复流,它会消耗一些消息,然后由于某种原因它会停止,直到发生轮询超时。

标签: apache-kafka-streamsspring-kafkaspring-cloud-stream

解决方案


您应该增加max.poll.timeout.ms,默认设置为 5 分钟。如果流未在此限制内发送轮询请求,则 kafka 代理认为该流已死。

我们在 kafka 流中寻找延迟处理的解决方案,我们希望在几分钟内延迟消息传递并增加 max.poll.timeout.ms 以避免启动流。


推荐阅读