java - Kafka 消费者自定义 MetricReporter 未收到指标
问题描述
我创建了一个实现如下的类org.apache.kafka.common.metrics.KafkaMetric
:
public class DatadogMetricTracker implements MetricsReporter {
@Override
public void configure(Map<String, ?> configs) {
System.out.println(configs);
}
@Override
public void init(List<KafkaMetric> metrics) {
System.out.println(metrics);
}
@Override
public void metricChange(KafkaMetric metric) {
System.out.println(metric.metricName().name() + ": " + metric.value() + " tags: " + metric.metricName().tags());
}
@Override
public void metricRemoval(KafkaMetric metric) {
}
@Override
public void close() {
}
}
然后我在设置 Kafka 道具时将该类注册为指标报告器:
properties.put(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, "com.myco.utils.DatadogMetricTracker");
当我启动我的消费者时,configure
被调用并且init
,然后metricChange
被调用一次,其中的值都是 0 或 -Infinity 的一批指标,然后它再也不会被调用。如何让我的指标记录器再次启动?
谢谢!
解决方案
检查了代码,除了最初注册指标外,不会调用 metricChange 方法。所以最好使用一些替代方法。截至目前,我将实现指标的定期读取和发布,因为在我们的案例中,Prometheus 将从我们的应用程序中抓取指标。更重要的一点是,由于已经在内部计算了值,因此将仅使用量规来填充 prometheus 的值
它适用于 JMX 报告器,因为它们创建动态 bean,并且当检查 Mbean 的属性值时(通过 JConsole 或其他东西),然后调用获取度量值的实际方法,因此它在那里工作,这里是代码:
public Object getAttribute(String name) throws AttributeNotFoundException, MBeanException, ReflectionException {
if (this.metrics.containsKey(name))
return this.metrics.get(name).metricValue();
else
throw new AttributeNotFoundException("Could not find attribute " + name);
}
推荐阅读
- c++ - 使用 ESP8266 上的 PubSubClient 将字节 * 转换为字符串
- c# - C# 处理文件错误
- python - 如何在 Python 中将列表写入文件?
- reactjs - keycloak 单点登录不适用于反应应用程序
- c# - 使用 MSAL 获取索赔
- linux - Postgresql 未在 ubuntu 20 上安装
- pyspark - 零件的 PySpark 结构化流式处理和过滤处理
- firebase - Cloud Firestore 0.14.3 版导致 Flutter 移动应用程序中的版本冲突
- c# - 如何将主窗口中的绑定数据传输到 WPF 中的另一个窗口?
- php - date() 和 DateTime::format() 的输出不同,DateTime::format() 忽略时区