首页 > 解决方案 > 使用度量的 Flink 事件计数

问题描述

我在 kafka 中有一个主题,我在其中获得了 json 格式的多种类型的事件。我创建了一个文件流接收器,通过分桶将这些事件写入 S3。

FlinkKafkaConsumer errorTopicConsumer = new FlinkKafkaConsumer(ERROR_KAFKA_TOPICS,
                new SimpleStringSchema(),
                properties);
        final StreamingFileSink<Object> errorSink = StreamingFileSink
                .forRowFormat(new Path(outputPath + "/error"), new SimpleStringEncoder<>("UTF-8"))
                .withBucketAssigner(new EventTimeBucketAssignerJson())
                .build();

        env.addSource(errorTopicConsumer)
                .name("error_source")
                .setParallelism(1)
                .addSink(errorSink)
                .name("error_sink").setParallelism(1);
public class EventTimeBucketAssignerJson implements BucketAssigner<Object, String> {

    @Override
    public String getBucketId(Object record, Context context) {
        StringBuffer partitionString = new StringBuffer();
        Tuple3<String, Long, String> tuple3 = (Tuple3<String, Long, String>) record;
        try {
            partitionString.append("event_name=")
                    .append(tuple3.f0).append("/");

            String timePartition = TimeUtils.getEventTimeDayPartition(tuple3.f1);
            partitionString.append(timePartition);
        } catch (Exception e) {
            partitionString.append("year=").append(Constants.DEFAULT_YEAR).append("/")
                    .append("month=").append(Constants.DEFAULT_MONTH).append("/")
                    .append("day=").append(Constants.DEFAULT_DAY);
        }
        return partitionString.toString();
    }

    @Override
    public SimpleVersionedSerializer<String> getSerializer() {
        return SimpleVersionedStringSerializer.INSTANCE;
    }
}

现在我想将每个事件的每小时计数发布为 prometheus 的指标,并在其上发布一个 grafana 仪表板。

所以请帮助我如何使用 flink 指标实现每个事件的每小时计数并发布到 prometheus。

谢谢

标签: streamapache-flinkflink-streaming

解决方案


通常,这是通过简单地为请求创建一个计数器然后使用rate()Prometheus 中的函数来完成的,这将为您提供给定时间内的请求率。

但是,如果您出于某种原因想要自己执行此操作,那么您可以执行类似于org.apache.kafka.common.metrics.stats.Rate. 因此,在这种情况下,您需要收集样本列表及其收集时间,以及您要用于计算速率的窗口大小,然后您可以简单地进行计算,即删除样本超出范围并已过期,然后只需计算窗口中有多少样本。

然后,您可以将 设置Gauge为计算值。


推荐阅读