首页 > 解决方案 > Flink 窗口操作符架构演变 - 保存点反序列化失败

问题描述

我运行简单的流媒体作业,计算广告系列的每小时展示次数:

.keyBy(imp => imp.campaign_id) 
.window(TumblingEventTimeWindows.of(...)) 
.aggregate(new BudgetSpendingByImpsAggregateFunction(), new BudgetSpendingByImpsWindowFunction()) 

聚合函数求和印象数:

class BudgetSpendingByImpsAggregateFunction extends 
AggregateFunction[ImpressionEvent, BudgetSpending, BudgetSpending]{ 
    override def add(value: ImpressionEvent, accumulator: BudgetSpending): = { 
        accumulator + value 
    } 
... 
} 

BudgetSpending 只是简单的 scala 案例类累加器:

case class BudgetSpending(var impressions: Int = 0){ 
    def +(imp: ImpressionEvent): BudgetSpending =_ 
} 

我需要向 BudgetSpending 累加器类添加一个新计数器,用于计算支出。

@SerialVersionUID(-7854299638715463891L) 
case class BudgetSpending(var impressions: Int = 0, var spent: Double = 0)

但是当我使用以前版本的保存点运行 flink 作业时,我遇到了错误:

java.lang.Exception: Exception while creating StreamOperatorStateContext. 
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191) 
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227) 
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730) 
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295) 
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) 
        at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
state backend for WindowOperator_74af6afa20f38ce575bfc2d1386aa434_(1/2) from 
any of the 1 provided restore options. 
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137) 
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:276) 
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:132) 
        ... 5 more 
Caused by: java.lang.ArrayIndexOutOfBoundsException: 2 
        at 
ga.p2502.streams.flink.budget.spending.HourlyBudgetSpendingJob$$anon$18$$anon$8.createInstance(HourlyBudgetSpendingJob.scala:60) 
        at 
ga.p2502.streams.flink.budget.spending.HourlyBudgetSpendingJob$$anon$18$$anon$8.createInstance(HourlyBudgetSpendingJob.scala:60) 
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:133) 
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32) 
        at 
org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:133) 
        at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:430) 
        at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:315) 
        at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:95) 
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151) 
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123) 
        ... 7 more 

非常常见的用例,但 Flink 似乎有问题。你知道如何处理 Flink 中的案例类演化吗?

谢谢

标签: scalaserializationapache-flinkflink-streamingcase-class

解决方案


推荐阅读