scala - Flink 窗口操作符架构演变 - 保存点反序列化失败
问题描述
我运行简单的流媒体作业,计算广告系列的每小时展示次数:
.keyBy(imp => imp.campaign_id)
.window(TumblingEventTimeWindows.of(...))
.aggregate(new BudgetSpendingByImpsAggregateFunction(), new BudgetSpendingByImpsWindowFunction())
聚合函数求和印象数:
class BudgetSpendingByImpsAggregateFunction extends
AggregateFunction[ImpressionEvent, BudgetSpending, BudgetSpending]{
override def add(value: ImpressionEvent, accumulator: BudgetSpending): = {
accumulator + value
}
...
}
BudgetSpending 只是简单的 scala 案例类累加器:
case class BudgetSpending(var impressions: Int = 0){
def +(imp: ImpressionEvent): BudgetSpending =_
}
我需要向 BudgetSpending 累加器类添加一个新计数器,用于计算支出。
@SerialVersionUID(-7854299638715463891L)
case class BudgetSpending(var impressions: Int = 0, var spent: Double = 0)
但是当我使用以前版本的保存点运行 flink 作业时,我遇到了错误:
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
state backend for WindowOperator_74af6afa20f38ce575bfc2d1386aa434_(1/2) from
any of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:276)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:132)
... 5 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: 2
at
ga.p2502.streams.flink.budget.spending.HourlyBudgetSpendingJob$$anon$18$$anon$8.createInstance(HourlyBudgetSpendingJob.scala:60)
at
ga.p2502.streams.flink.budget.spending.HourlyBudgetSpendingJob$$anon$18$$anon$8.createInstance(HourlyBudgetSpendingJob.scala:60)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:133)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32)
at
org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:133)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:430)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:315)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:95)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
... 7 more
非常常见的用例,但 Flink 似乎有问题。你知道如何处理 Flink 中的案例类演化吗?
谢谢
解决方案
推荐阅读
- javascript - 从android中的webview获取图像src
- excel - 在单元格中输入日期
- python - 如何使用熊猫按月和总小时制作数据透视表?
- node.js - 如何查询数组中的元素?
- emacs - 在 emacs 中,如何替换文本块中的所有重音符号?
- java - 在 If else 分支中,首选方法是什么?
- java - 如何实现recyclerview的水平自动滚动
- python - 与 C 相比,为什么在 Python 中写入 I2C 太慢?
- c# - 在 Visual Studio 中创建新的拉取请求默认分支
- layout - 在 nativescript 中,一页上的许多布局是否不好?如果是这样,为什么?