stream - 使用度量的 Flink 事件计数
问题描述
我在 kafka 中有一个主题,我在其中获得了 json 格式的多种类型的事件。我创建了一个文件流接收器,通过分桶将这些事件写入 S3。
FlinkKafkaConsumer errorTopicConsumer = new FlinkKafkaConsumer(ERROR_KAFKA_TOPICS,
new SimpleStringSchema(),
properties);
final StreamingFileSink<Object> errorSink = StreamingFileSink
.forRowFormat(new Path(outputPath + "/error"), new SimpleStringEncoder<>("UTF-8"))
.withBucketAssigner(new EventTimeBucketAssignerJson())
.build();
env.addSource(errorTopicConsumer)
.name("error_source")
.setParallelism(1)
.addSink(errorSink)
.name("error_sink").setParallelism(1);
public class EventTimeBucketAssignerJson implements BucketAssigner<Object, String> {
@Override
public String getBucketId(Object record, Context context) {
StringBuffer partitionString = new StringBuffer();
Tuple3<String, Long, String> tuple3 = (Tuple3<String, Long, String>) record;
try {
partitionString.append("event_name=")
.append(tuple3.f0).append("/");
String timePartition = TimeUtils.getEventTimeDayPartition(tuple3.f1);
partitionString.append(timePartition);
} catch (Exception e) {
partitionString.append("year=").append(Constants.DEFAULT_YEAR).append("/")
.append("month=").append(Constants.DEFAULT_MONTH).append("/")
.append("day=").append(Constants.DEFAULT_DAY);
}
return partitionString.toString();
}
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
}
现在我想将每个事件的每小时计数发布为 prometheus 的指标,并在其上发布一个 grafana 仪表板。
所以请帮助我如何使用 flink 指标实现每个事件的每小时计数并发布到 prometheus。
谢谢
解决方案
通常,这是通过简单地为请求创建一个计数器然后使用rate()
Prometheus 中的函数来完成的,这将为您提供给定时间内的请求率。
但是,如果您出于某种原因想要自己执行此操作,那么您可以执行类似于org.apache.kafka.common.metrics.stats.Rate
. 因此,在这种情况下,您需要收集样本列表及其收集时间,以及您要用于计算速率的窗口大小,然后您可以简单地进行计算,即删除样本超出范围并已过期,然后只需计算窗口中有多少样本。
然后,您可以将 设置Gauge
为计算值。
推荐阅读
- java - 如何获取 X 列并遍历其所有行?
- asp.net-mvc - C# 静态类 HtmlHelper 不是线程安全的
- tensorflow-hub - 无法使用 Tensorflow_hub 访问 URL
- rest - 行级安全性 (RLS) 可以在 Power BI 中以编程方式应用吗?
- c# - 无法加载文件或程序集 - 发布解决方案的程序集版本错误
- php - Laravel - 条件路由名称映射/动态路由
- python - Pandas groupby 月份输出不正确
- mapbox - 如何避免多边形被tileset剪裁?
- android - LiveData 与 StateFlow:我们应该从 Live 数据切换到 State Flow 吗?
- flutter - 用于 dart/flutter 测试的覆盖/模拟库函数