apache-flink - Flink - 键控过程功能中的Java类成员
问题描述
我有以下 flink keyedprocessfunction。我基本上是在尝试实现状态设计模式。
public AlertProcessor extends KeyedProcessFunction<Tuple2<String, String>, Event1, Event2> {
private transient AlertState currentState;
private transient AlertState activeAlertState;
private transient AlertState noActiveAlertState;
private transient AlertState resolvedAlertState;
@Override
public void open(Configuration parameters) {
activeAlertState = new ActiveAlertState();
noActiveAlertState = new NoActiveAlertState();
resolvedAlertState = new ResolvedAlertState();
}
@Override
public processElement(Event1 event1, Context ctx, Collector<Event2> out) throws Exception {
// Would the below if condition work for multiple keys?
if (currentAlertState == null) {
currentAlertState = noActiveAlertState;
}
currentAlertState.handle(event1, out);
}
private interface AlertState {
void handle(Event1 event1, Collector<Event2> out);
}
private class ActiveAlertState implements AlertState {
void handle(Event1 event1, Collector<Event2> out) {
logger.debug("Moving to no alertState");
// Do something and push some Event2 to out
currentAlertState = resolvedActiveAlertState;
}
}
private class NoActiveAlertState implements AlertState {
void handle(Event1 event1, Collector<Event2> out) {
logger.debug("Moving to no alertState");
// Do something and push some Event2 to out
currentAlertState = activeAlertState;
}
}
private class ResolvedAlertState implements AlertState {
void handle(Event1 event1, Collector<Event2> out) {
logger.debug("Moving to no alertState");
// Do something and push some Event2 to out
currentAlertState = noActiveAlertState;
}
}
}
我的问题是——
- 流中的每个键是否会有一个 AlertProcessor 实例(或对象)?换句话说, currentAlertState 对象是否每个键都是唯一的?或者这个 AlertProcessor 操作符的每个实例都会有一个 currentAlertState?
如果 currentAlertState 是运算符的每个实例,那么此代码将不会真正起作用,因为 currentAlertState 将被不同的键覆盖。我的理解正确吗?
我可以将 currentAlertState 存储为键控状态,并为每个 processElement() 调用初始化它。如果我这样做,我不需要在 handle() 实现中将 currentAlertState 分配或设置为下一个状态,因为 currentAlertState 无论如何都会根据 flink 状态进行初始化。
有没有更好的方法在 flink 中实现状态设计模式并且仍然减少创建的状态对象的数量?
解决方案
将在管道的每个并行实例(每个任务槽)中创建一个AlertProcessor
实例,并将在该槽处理的所有键上多路复用。
如果 currentAlertState 是运算符的每个实例,那么此代码将不会真正起作用,因为 currentAlertState 将被不同的键覆盖。我的理解正确吗?
正确的。您应该对 使用键控状态currentAlertState
,这将导致在状态后端中为每个不同的键创建一个条目。
推荐阅读
- sql-server - 从查询中提取数据到变量
- flutter - Dart 中的嵌套 Map 操作
- amazon-web-services - 亚马逊短信发送状态和收到的短信
- javascript - 切换显示/隐藏元素,其中默认刷新为隐藏
- android - 没有构建器可用于构建“org.jetbrains.plugins.gradle.model.internal.DummyModel”类型的模型
- go - 停止 goroutine 在 bufio.NewScanner 上等待
- socat - 使用 Socat 监听多个端口(端口范围)
- google-analytics - 增强的电子商务错误数据
- java - 如何读取第 12000 个字符后的一行?
- selenium-webdriver - OpenQA.Selenium.WebDriverException: [windowHandle] 不是顶级窗口句柄解决方案