首页 > 解决方案 > 带有断路器的 Kafka 消费者,使用 Resilience4j 重试模式

问题描述

我需要一些帮助来了解如何使用 Spring boot、Kafka、Resilence4J 提出解决方案,以实现来自我的 Kafka Consumer 的微服务调用。假设如果微服务关闭,那么我需要使用断路器模式通知我的 Kafka 消费者以停止获取消息/事件,直到微服务启动并运行。

标签: spring-bootapache-kafkaspring-cloud-streamresilience4j

解决方案


使用 Spring Kafka,您可以根据 CircuitBreaker 状态转换使用pause和方法。resume我为此找到的最佳方法是将其定义为带有 @Configuration 注释的“主管”。还使用了 Resilience4j。

@Configuration
public class CircuitBreakerConsumerConfiguration {

public CircuitBreakerConsumerConfiguration(CircuitBreakerRegistry circuitBreakerRegistry, KafkaManager kafkaManager) {
    circuitBreakerRegistry.circuitBreaker("yourCBName").getEventPublisher().onStateTransition(event -> {
  
        switch (event.getStateTransition()) {
            case CLOSED_TO_OPEN:
            case CLOSED_TO_FORCED_OPEN:
            case HALF_OPEN_TO_OPEN:
                kafkaManager.pause();
                break;
            case OPEN_TO_HALF_OPEN:
            case HALF_OPEN_TO_CLOSED:
            case FORCED_OPEN_TO_CLOSED:
            case FORCED_OPEN_TO_HALF_OPEN:
                kafkaManager.resume();
                break;
            default:
                throw new IllegalStateException("Unknown transition state: " + event.getStateTransition());
        }
    });
   }
}

这是我结合使用 KafkaManager 注释的@Component

@Component
public class KafkaManager {
  private final KafkaListenerEndpointRegistry registry;

  public KafkaManager(KafkaListenerEndpointRegistry registry) {
    this.registry = registry;
  }
  public void pause() {   
    registry.getListenerContainers().forEach(MessageListenerContainer::pause);
  }

  public void resume() {
    registry.getListenerContainers().forEach(MessageListenerContainer::resume);
  }
}

此外,我的消费者服务如下所示:

  @KafkaListener(topics = "#{'${topic.name}'}", concurrency = "1", id = "CBListener")
public void receive(final ConsumerRecord<String, ReplayData> replayData, Acknowledgment acknowledgment) throws
        Exception {

    try {
        httpClientServiceCB.receiveHandleCircuitBreaker(replayData);
        acknowledgement.acknowledge();
    } catch (Exception e) {
        acknowledgment.nack(1000);
    }
}

@CircuitBreaker注释:

@CircuitBreaker(name = "yourCBName")
public void receiveHandleCircuitBreaker(ConsumerRecord<String, ReplayData> replayData) throws
        Exception {
    try {
        String response = restTemplate.getForObject("http://localhost:8081/item", String.class);
    } catch (Exception e                                                                       ) {
       
        // throwing the exception is needed to trigger the Circuit Breaker state change
        throw new Exception();
    }
}

这还补充了以下内容application.properties

  resilience4j.circuitbreaker.instances.yourCBName.failure-rate-threshold=80
  resilience4j.circuitbreaker.instances.yourCBName.sliding-window-type=COUNT_BASED
  resilience4j.circuitbreaker.instances.yourCBName.sliding-window-size=5
  resilience4j.circuitbreaker.instances.yourCBName.wait-duration-in-open-state=10000
  resilience4j.circuitbreaker.instances.yourCBName.automatic-transition-from-open-to-half-open-enabled=true
  spring.kafka.consumer.enable.auto.commit = false
  spring.kafka.listener.ack-mode = MANUAL_IMMEDIATE

也看看https://resilience4j.readme.io/docs/circuitbreaker


推荐阅读