apache-flink - Flink 获取 KeyedState 状态值并在另一个 Stream 中使用
问题描述
我知道键控状态属于它的键,只有当前键访问它的状态值,其他键不能访问不同键的状态值。
我尝试使用相同的密钥但在不同的流中访问状态。可能吗?
如果不可能,那么我将有 2 个重复数据?
不:我需要两个流,因为它们每个都有不同的时间窗口和不同的实现。
这是示例(我知道 keyBy(somthing) 对于两个流操作都是相同的):
public class Sample{
streamA
.keyBy(something)
.timeWindow(Time.seconds(4))
.process(new CustomMyProcessFunction())
.name("CustomMyProcessFunction")
.print();
streamA
.keyBy(something)
.timeWindow(Time.seconds(1))
.process(new CustomMyAnotherProcessFunction())
.name("CustomMyProcessFunction")
.print();
}
public class CustomMyProcessFunction extends ProcessWindowFunction<..>
{
private Logger logger = LoggerFactory.getLogger(CustomMyProcessFunction.class);
private transient ValueState<SimpleEntity> simpleEntityValueState;
private SimpleEntity simpleEntity;
@Override
public void open(Configuration parameters) throws Exception
{
ValueStateDescriptor<SimpleEntity> simpleEntityValueStateDescriptor = new ValueStateDescriptor<SimpleEntity>(
"sample",
TypeInformation.of(SimpleEntity.class)
);
simpleEntityValueState = getRuntimeContext().getState(simpleEntityValueStateDescriptor);
}
@Override
public void process(...) throws Exception
{
SimpleEntity value = simpleEntityValueState.value();
if (value == null)
{
SimpleEntity newVal = new SimpleEntity("sample");
logger.info("New Value put");
simpleEntityValueState.update(newVal);
}
...
}
...
}
public class CustomMyAnotherProcessFunction extends ProcessWindowFunction<..>
{
private transient ValueState<SimpleEntity> simpleEntityValueState;
@Override
public void open(Configuration parameters) throws Exception
{
ValueStateDescriptor<SimpleEntity> simpleEntityValueStateDescriptor = new ValueStateDescriptor<SimpleEntity>(
"sample",
TypeInformation.of(SimpleEntity.class)
);
simpleEntityValueState = getRuntimeContext().getState(simpleEntityValueStateDescriptor);
}
@Override
public void process(...) throws Exception
{
SimpleEntity value = simpleEntityValueState.value();
if (value != null)
logger.info(value.toString()); // I expect that SimpleEntity("sample")
out.collect(...);
}
...
}
解决方案
正如已经指出的那样,状态对于单个操作员实例来说总是本地的。它不能被共享。
但是,您可以做的是将状态更新从持有状态的操作员流式传输到需要它的其他操作员。使用侧面输出,您可以创建复杂的数据流,而无需共享状态。
推荐阅读
- php - 有没有办法使用 php 对十六进制颜色进行分组
- android - 如何将 ListView 按钮拉伸到屏幕的全宽
- mysql - 创建两个表时mysql错误
- java - 如何读取文件并保存到 hashmap 中,然后将第一个元素保存为键,其余元素保存在一个集合中?
- c++ - 常量“C”占用的空间(以字节为单位)
- python - 如何使用Figure(matplotlib FigureCanvasQTAgg)绘制具有多个y轴的共享xaxis子图?
- html - 如何停止针对不同屏幕分辨率调整 div 大小
- gradle - Intellij 2018.2:无法同步 Gradle 1.9 项目
- android - 任务':app:mergeDebugResources'的Jenkins执行失败
- java - Selenium - 无法获取子窗口的窗口句柄并且执行时子窗口没有关闭