apache-flink - Flink re-scalable keyed stream 有状态函数
问题描述
我有以下 Flink 作业,我尝试使用后端类型 RockDB 的键控流状态函数 (MapState),
environment
.addSource(consumer).name("MyKafkaSource").uid("kafka-id")
.flatMap(pojoMapper).name("MyMapFunction").uid("map-id")
.keyBy(new MyKeyExtractor())
.map(new MyRichMapFunction()).name("MyRichMapFunction").uid("rich-map-id")
.addSink(sink).name("MyFileSink").uid("sink-id")
MyRichMapFunction 是一个有状态的函数,它扩展了 RichMapFunction,它具有以下代码,
public static class MyRichMapFunction extends RichMapFunction<MyEvent, MyEvent> {
private transient MapState<String, Boolean> cache;
@Override
public void open(Configuration config) {
MapStateDescriptor<String, Boolean> descriptor =
new MapStateDescriptor("seen-values", TypeInformation.of(new TypeHint<String>() {}), TypeInformation.of(new TypeHint<Boolean>() {}));
cache = getRuntimeContext().getMapState(descriptor);
}
@Override
public MyEvent map(MyEvent value) throws Exception {
if (cache.contains(value.getEventId())) {
value.setIsSeenAlready(Boolean.TRUE);
return value;
}
value.setIsSeenAlready(Boolean.FALSE);
cache.put(value.getEventId(), Boolean.TRUE)
return value;
}
}
将来,我想重新调整并行度(从 2 到 4),所以我的问题是,如何实现可重新缩放的键控状态,以便在更改并行度后,我可以将相应的缓存键控数据获取到其相应的任务槽. 我试图探索这个,我在这里找到了一个文档. 据此,可以通过使用 ListCheckPointed 接口实现可重新扩展的操作员状态,该接口为此提供了 snapshotState/restoreState 方法。但不确定如何实现可重新缩放的键控状态 (MyRichMapFunction)?我是否需要为 MyRichMapFunction 类实现 ListCheckPointed 接口?如果是,我如何根据 restoreState 方法上的新并行键哈希重新分配缓存(我的 MapState 将在启用 TTL 的情况下保存大量键,假设它在任何时间点最多保存 10 亿个键)?有人可以帮我解决这个问题,或者如果你指出任何一个很好的例子。
解决方案
您编写的代码已经可以重新扩展;Flink 的托管键状态在设计上是可重新缩放的。通过重新平衡对实例的键分配来重新调整键状态。(您可以将键控状态视为分片键/值存储。从技术上讲,发生的情况是一致的哈希用于将键映射到键组,并且每个并行实例负责一些键组。重新缩放仅涉及重新分配键实例之间的组。)
该ListCheckpointed
接口用于非键上下文中使用的状态,因此它不适合您正在做的事情。另请注意,ListCheckpointed
在 Flink 1.11 中将不推荐使用更通用的CheckpointedFunction
.
还有一件事:如果MyKeyExtractor
是通过 键控value.getEventId()
,那么您可以将其ValueState<Boolean>
用于缓存,而不是MapState<String, Boolean>
. 这是有效的,因为对于键控状态,每个键都有一个单独的 ValueState 值。只有在需要为流中的每个键存储多个属性/值对时,才需要使用 MapState。
其中大部分内容在 Flink 文档中的Hands-on Training中进行了讨论,其中包含一个与您正在做的事情非常接近的示例。
推荐阅读
- python - Gunicorn 找不到系统 Renviron 错误
- python - Python 代码作为 Prime 和函数
- c - 计算错误 C uint64_t
- mysql - 从一列城市中,您将运行什么 sql 查询来生成一列城市排列
- vim - 我可以在 VIM 正常模式下重新映射 WORD 或“W”键吗?
- dnf - 为什么 Mathematica 不像 MATLAB 或 Python 那样流行?
- python-3.x - Azure 存储容器中 Blob 的 CreatedBy/LastModifiedBy 信息
- java - CassandraOperations 更新记录错误:在 SET 部分中找到 PRIMARY KEY 部分 id
- arrays - 使用两个数组进行输出
- image - 压缩图像使其小于 4KB