scala - Flink - 如何跨多个任务槽聚合和查询丰富的接收器函数状态
问题描述
我实现了一个丰富的接收器函数,它根据调用的对象执行一些网络调用。我希望能够计算这些事件的元数据,这些元数据由事件中包含的一些上下文信息(事件的批处理 ID)作为键控,并将这些元数据公开给外部系统。
例如,一个事件如下所示:
case class MyEvent(batchId: String, eventId: String, moreInformation: ...)
class MySink(...) extends RichSinkFunction[MyEvent]
{
override def open(parameters: Configuration): Unit = {
...
}
override def close(): Unit = {
...
}
override def invoke(event: MyEvent) = {
// some processing is done here
....
//
...
if (success) {
I want to save the meta data here per event.batchId
state.count.number.of.events.processed.for.event.batchId
}
}
}
在另一个地方,我希望能够以某种方式查询为 batchId 处理了多少事件的值
解决方案
几个选项:
计划 A:使用 Metric 对象和 MetricReporter 将数据公开给外部系统。这样做的缺点是指标没有检查点,如果有很多 batchId,您最终可能会用大量无法被 GC 处理的指标污染指标系统。
计划 B:将您的 RichSinkFunction 重写为 RichFlatMap(或 ProcessFunction),它发出持有 (batchId, number.of.events.in.batchId) 的元组流。您可以通过 batchId 键入此流,然后使用 KeyedProcessFunction 中的键入状态(例如)通过可查询状态存储和公开此状态。这样做的缺点是可查询状态只允许点查询(一次一个键)。
计划 C:在此变体中,外部系统可以查询在计划 B 中创建的状态,方法是将查询注入到广播到 KeyedBroadcastProcessFunction 的流中,该 KeyedBroadcastProcessFunction 包含键控 state.count.number.of.events.processed.for.event.batchId数据。然后您可以在 KeyedBroadcastProcessFunction 的 processBroadcastElement 方法中使用 ctx.applyToKeyedState 来响应这些查询。有关示例,请参阅其中一个 Flink 培训练习。
计划 D:将 B(或 C)的结果写入 redis、elasticsearch 或其他一些可查询的数据存储,并让外部系统从那里获取此信息。
推荐阅读
- c - 如何将以下 gstreamer 命令行转换为 c
- python - 如何使用 get_loc 获取特定标签的位置?
- go - 如何在循环中访问列表中的结构字段
- javascript - Webpack:Quasar 需要未使用的 knex 驱动程序依赖项
- typescript - NestJS 使用回调实现 express POST 调用
- javascript - Nvd3中如何改变y轴刻度?
- node.js - Node worker 以代码 3 退出时如何捕获异常
- reporting-services - SSRS 报告 - 在运行另一个报告时运行一组报告
- ios - youtube 在播放按钮上嵌入 iframe 在 iphone 6 中不起作用
- python - Python:只有字典列表中的最后一行被写入文件