首页 > 解决方案 > 关于具有动态(自我进化)键控状态的有效检查点

问题描述

在 KeyedCoProcessFunction 中,我正在管理由第三方库模型组成的键控状态。这些模型是在接收到. 内控制流上的新数据时创建的processElement1。因为模型是自我进化的,从某种意义上说,它们有自己的内部状态,我需要确保它们在modelsBytes状态变化时被序列化。我的第一次尝试是这样的:

class MyOperator
  extends KeyedCoProcessFunction[String, Control, Data, Prediction]
    with CheckpointedFunction {

  // To hold loaded models
  @transient private var models: HashMap[String, Model] = _

  // For serialization purposes
  @transient private var modelsBytes: MapState[String, Array[Bytes]] = _

  override def processElement1(control, ctx, ...) {
    if (restoreModels) {
      restoreModels()
    }

    // - Create new model out of `control` element
    // - Add it to `models` keyed state
  }

  override def processElement2(data, ctx, ...) {
    if (restoreModels) {
      restoreModels()
    }

    // - Send `data` element to the corresponding models
    // This will update their internal states
  }

  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    // Suspicious, wishful-thinking code that compiles and runs just fine
    for ((k, model) <- models) {
      modelsBytes.put(k, model.toBytes(v))
    }
  }

  override def initializeState(context: FunctionInitializationContext): Unit = {
    modelsBytes = context.getKeyedStateStore.getMapState[String](
      new MapStateDescriptor("modelsBytes", classOf[String])
    )

    if (context.isRestored) restoreModels = true
  }

}

所以,这个想法是用来snapshotState覆盖. 我之所以尝试这种方法是因为序列化模型 ( ) 可能是一项昂贵的操作。因此,当检查点到来时,我更愿意为每个模型执行一次。这种方法的问题在于它可能在本质上/概念上是错误的。这就是为什么,即使其中的代码编译并运行得很好,请注意我指的是一个键控状态片段而没有传入上下文,所以根本不清楚我真正开始使用的键是什么。我编写了一个小测试来验证检查点,并且我观察到有时我会返回一个空状态,即使 modelsBytes 状态条目在modelsBytesmodel.toBytessnapshotStatekeyedsnapshotState. 因此,像这样对我的模型进行快照似乎根本不可靠。让我感到困惑的是,完全允许用户这样做,也许该put方法应该引发异常以明确首先需要键控状态,否则它会给出错误的希望并可能导致难以发现错误。事实上,这不应该被认为是一个错误吗?

当然,我的另一个选择是在processElement2向它们发送新数据元素之后序列化我的模型。但是,不断序列化我的模型以进行更新modelsBytes可能代价高昂。

处理这种情况的最有效方法是什么?

标签: apache-flink

解决方案


推荐阅读