首页 > 解决方案 > Kafka 消费者自定义 MetricReporter 未收到指标

问题描述

我创建了一个实现如下的类org.apache.kafka.common.metrics.KafkaMetric

public class DatadogMetricTracker implements MetricsReporter {

    @Override
    public void configure(Map<String, ?> configs) {
        System.out.println(configs);
    }

    @Override
    public void init(List<KafkaMetric> metrics) {
        System.out.println(metrics);
    }

    @Override
    public void metricChange(KafkaMetric metric) {
        System.out.println(metric.metricName().name() + ": " + metric.value() + " tags: " + metric.metricName().tags());
    }

    @Override
    public void metricRemoval(KafkaMetric metric) {

    }

    @Override
    public void close() {

    }

}

然后我在设置 Kafka 道具时将该类注册为指标报告器:

properties.put(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, "com.myco.utils.DatadogMetricTracker");

当我启动我的消费者时,configure被调用并且init,然后metricChange被调用一次,其中的值都是 0 或 -Infinity 的一批指标,然后它再也不会被调用。如何让我的指标记录器再次启动?

谢谢!

标签: javaapache-kafkametricsconsumerdatadog

解决方案


检查了代码,除了最初注册指标外,不会调用 metricChange 方法。所以最好使用一些替代方法。截至目前,我将实现指标的定期读取和发布,因为在我们的案例中,Prometheus 将从我们的应用程序中抓取指标。更重要的一点是,由于已经在内部计算了值,因此将仅使用量规来填充 prometheus 的值

它适用于 JMX 报告器,因为它们创建动态 bean,并且当检查 Mbean 的属性值时(通过 JConsole 或其他东西),然后调用获取度量值的实际方法,因此它在那里工作,这里是代码:

        public Object getAttribute(String name) throws AttributeNotFoundException, MBeanException, ReflectionException {
            if (this.metrics.containsKey(name))
                return this.metrics.get(name).metricValue();
            else
                throw new AttributeNotFoundException("Could not find attribute " + name);
        }

推荐阅读