apache-flink - TTL 清理 RocksDB 列表状态时,Flink 应用程序失败
问题描述
每当 TTL 配置清除列表状态时,一个使用来自 2 个 Kafka 主题的数据并加入相关事件的应用程序就会不断失败。flatMap的逻辑与我看到的大多数逻辑有点不同。第一个流中的每个事件都可能与第二个流中的事件多次连接,因此这是一个一对多的上下文。
这是连接的代码:
val join = streamA
.connect(streamB)
.flatMap(
new JoinOneToManyStreams[StreamA, StreamB](
"streamA",
"streamB",
Time.minutes(3)
)
).uid("join")
.keyBy(_.streamB.id)
这是实现逻辑的类:
class JoinOneToManyStreams[A, B] (stateDescriptorA: String, stateDescriptorB: String, time: Time) extends RichCoFlatMapFunction[A, B, StateOut[A, B]] {
val ttlConfig: StateTtlConfig = StateTtlConfig
.newBuilder(time)
// Set TTL just once
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupInRocksdbCompactFilter(1000)
.build
val streamAStateDescriptor = new ValueStateDescriptor[StateIn[A]](stateDescriptorA, classOf[StateIn[A]])
streamAStateDescriptor.enableTimeToLive(ttlConfig)
lazy val streamAState: ValueState[StateIn[A]] = getRuntimeContext.getState(streamAStateDescriptor)
val streamBStateDescriptor = new ListStateDescriptor[StateIn[B]](stateDescriptorB, classOf[StateIn[B]])
streamBStateDescriptor.enableTimeToLive(ttlConfig)
lazy val streamBState: ListState[StateIn[B]] = getRuntimeContext.getListState(streamBStateDescriptor)
override def flatMap1(streamA: A, out: Collector[StateOut[A, B]]): Unit = {
streamAState.update(StateIn[A](stateValue = streamA))
val stateB = streamBState.get
if (stateB.nonEmpty) {
streamBState.clear()
stateB.forEach(row => {
out.collect(StateOut(streamA, row.stateValue))
})
}
}
override def flatMap2(streamB: B, out: Collector[StateOut[A, B]]): Unit = {
streamBState.add(StateIn(streamB))
val stateA = streamAState.value
if (stateA != null) {
val stateB = streamBState.get
streamBState.clear()
stateB.forEach(row => {
out.collect(StateOut(stateA.stateValue, row.stateValue))
})
}
}
}
如您所见,使用了 cleanupInRocksdbCompactFilter,但我也尝试了 fullSnapshot 并且注意到了相同的行为。
错误:
Exception in thread "Thread-19" java.lang.IllegalArgumentException: classLoader cannot be null.
at com.esotericsoftware.kryo.Kryo.setClassLoader(Kryo.java:975)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:477)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:337)
at org.apache.flink.api.common.typeutils.CompositeSerializer.deserialize(CompositeSerializer.java:151)
at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextElementLastAccessTimestamp(RocksDbTtlCompactFiltersManager.java:193)
at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextUnexpiredOffset(RocksDbTtlCompactFiltersManager.java:180)
我知道列表状态是问题,因为我为这种状态禁用了 tll 并且问题停止了。有什么办法让它工作吗?
解决方案
推荐阅读
- apache-kafka - Kafka Streams - 时间窗口关闭延迟?
- android - 如何将安卓应用数据暴露给其他应用?
- python - 如何使用限制性数据集在自己和他人之间创建 Keras 人脸分类器?
- java - 在 Java 中使用 for 循环创建地图时,Kep 的值始终保持为 0
- batch-file - 批处理脚本随机无法将文件从 Unix 共享目录复制到 Windows
- google-apps-script - 如果我的代码没有对 Google+ 的可见引用,Google+ OAuth 关闭会影响我吗?
- r - 如何使用 lintr::lint() 获得非零退出状态以使构建失败
- javascript - react, typescript - 无状态组件和普通组件的类型
- angular - 角反应形式或条件验证器
- java - mysql连接的合理connectionTimeout?