首页 > 解决方案 > Spring Boot:Kafka 健康指标

问题描述

我有类似下面的东西效果很好,但我更喜欢在不发送任何消息的情况下检查运行状况(不仅检查套接字连接)。我知道 Kafka 有开箱即用的 KafkaHealthIndicator 之类的东西,有人有使用它的经验或例子吗?

   public class KafkaHealthIndicator implements HealthIndicator {
   private final Logger log = LoggerFactory.getLogger(KafkaHealthIndicator.class);

   private KafkaTemplate<String, String> kafka;

   public KafkaHealthIndicator(KafkaTemplate<String, String> kafka) {
   this.kafka = kafka;
   }

  @Override
  public Health health() {
  try {
     kafka.send("kafka-health-indicator", "❥&quot;).get(100, TimeUnit.MILLISECONDS);
  } catch (InterruptedException | ExecutionException | TimeoutException e) {
      return Health.down(e).build();
  }
  return Health.up().build();
 }
}

标签: javaspringspring-bootapache-kafkaspring-kafka

解决方案


kafkaAdminClient.describeCluster(..)是测试 Kafka 可用性的地方。

@Configuration
public class KafkaConfig {

    @Autowired
    private KafkaAdmin kafkaAdmin;

    @Bean
    public AdminClient kafkaAdminClient() {
        return AdminClient.create(kafkaAdmin.getConfigurationProperties());
    }

    @Bean
    public HealthIndicator kafkaHealthIndicator(AdminClient kafkaAdminClient) {
        final DescribeClusterOptions options = new DescribeClusterOptions()
            .timeoutMs(1000);

        return new AbstractHealthIndicator() {
            @Override
            protected void doHealthCheck(Health.Builder builder) throws Exception {
                // When Kafka is not connected, describeCluster() method throws
                // an exception which in turn sets this indicator as being DOWN.
                kafkaAdminClient.describeCluster(options);

                builder.up().build();
            }
        };
    }

}

如需更详细的探测,请添加:

DescribeClusterResult clusterDesc = kafkaAdminClient.describeCluster(options);
builder.up()
    .withDetail("clusterId", clusterDesc.clusterId().get())
    .withDetail("nodeCount", clusterDesc.nodes().get().size())
    .build();

推荐阅读