apache-flink - 关于具有动态(自我进化)键控状态的有效检查点
问题描述
在 KeyedCoProcessFunction 中,我正在管理由第三方库模型组成的键控状态。这些模型是在接收到. 内控制流上的新数据时创建的processElement1
。因为模型是自我进化的,从某种意义上说,它们有自己的内部状态,我需要确保它们在modelsBytes
状态变化时被序列化。我的第一次尝试是这样的:
class MyOperator
extends KeyedCoProcessFunction[String, Control, Data, Prediction]
with CheckpointedFunction {
// To hold loaded models
@transient private var models: HashMap[String, Model] = _
// For serialization purposes
@transient private var modelsBytes: MapState[String, Array[Bytes]] = _
override def processElement1(control, ctx, ...) {
if (restoreModels) {
restoreModels()
}
// - Create new model out of `control` element
// - Add it to `models` keyed state
}
override def processElement2(data, ctx, ...) {
if (restoreModels) {
restoreModels()
}
// - Send `data` element to the corresponding models
// This will update their internal states
}
override def snapshotState(context: FunctionSnapshotContext): Unit = {
// Suspicious, wishful-thinking code that compiles and runs just fine
for ((k, model) <- models) {
modelsBytes.put(k, model.toBytes(v))
}
}
override def initializeState(context: FunctionInitializationContext): Unit = {
modelsBytes = context.getKeyedStateStore.getMapState[String](
new MapStateDescriptor("modelsBytes", classOf[String])
)
if (context.isRestored) restoreModels = true
}
}
所以,这个想法是用来snapshotState
覆盖. 我之所以尝试这种方法是因为序列化模型 ( ) 可能是一项昂贵的操作。因此,当检查点到来时,我更愿意为每个模型执行一次。这种方法的问题在于它可能在本质上/概念上是错误的。这就是为什么,即使其中的代码编译并运行得很好,请注意我指的是一个键控状态片段而没有传入上下文,所以根本不清楚我真正开始使用的键是什么。我编写了一个小测试来验证检查点,并且我观察到有时我会返回一个空状态,即使 modelsBytes 状态条目在modelsBytes
model.toBytes
snapshotState
keyed
snapshotState
. 因此,像这样对我的模型进行快照似乎根本不可靠。让我感到困惑的是,完全允许用户这样做,也许该put
方法应该引发异常以明确首先需要键控状态,否则它会给出错误的希望并可能导致难以发现错误。事实上,这不应该被认为是一个错误吗?
当然,我的另一个选择是在processElement2
向它们发送新数据元素之后序列化我的模型。但是,不断序列化我的模型以进行更新modelsBytes
可能代价高昂。
处理这种情况的最有效方法是什么?
解决方案
推荐阅读
- javascript - 根据数组中的两个或多个字符串过滤和包含
- nginx - Nginx 代理到来自同一位置的多个端口
- swift - SwiftUI:如何用 for 循环填充字典?
- java - Jflex 获取输入文件名
- c++ - 数字相乘时有关算术溢出的警告
- python - 试图打印一个排序的字典,但它给了我语法错误
- javascript - 为什么 React 在 django 项目中不起作用?
- ruby-on-rails - 如何访问rails中的对象数据
- javascript - 在加入和离开时向我的私人频道发送消息
- c# - WebMarkupMin + Razor + React:无法将标记缩小应用于以“gzip”格式编码的文本内容