apache-flink - KeyedProcessFunction 中所有键的 Flink 通用状态
问题描述
我想为KeyedProcessFunction
下面的所有状态保留一个简单的值;
class StateC() extends KeyedProcessFunction[Long, A, B] {
var timestamp: Timestamp = _
override def open(parameters: Configuration): Unit = {
timestamp = assignCurrentHour()
}
override def processElement(item: A, ... ): Unit = {
val currentHour = now.truncateToHour()
if (currentHour.after(timestamp)) {
timestamp = assignCurrentHour()
}
....
}
}
我只是想知道我是否在新的时间。为此,我保留timestamp
变量。变量的值timestamp
对于此 TaskManager 中的所有键都是通用的。所以我不需要为每个键说明。
在这种情况下,timestamp
变量将task manager
在新的时间处理任何事件时更新,对吧?
可以同时修改timestamp
变量吗?
解决方案
您提出的实现看起来不错(但请记住,我不知道您的所有要求)。的每个并行实例都KeyedProcessFunction
将具有自己的时间戳版本,并且每个实例都会在新的时间处理事件后立即更新其时间戳。AKeyedProcessFunction
是单线程的:您不必担心并发更新。(该onTimer
方法与其他方法同步;那里没有什么可担心的。)
推荐阅读
- machine-learning - 给定上下文和可能的单词,如何计算哪个单词最适合?
- elasticsearch - Search Guard 使用 SSL 连接到远程 Elasticsearch 集群
- angular - *ngFor 中是否可以进行反射
- azureservicebus - MassTransit Azure 服务总线,设置定期计划
- json - 如何在关系表laravel 5.8中解码Json数据
- python - 在 Keras 中使用 ConvLSTM 对移动 MNIST 进行非相干帧预测
- java - 需要在 Testng 运行时跳过测试用例
- javascript - 操作包含文件路径的字符串以仅获取文件名
- swift - Swift - 超时执行网络调用的递归函数?
- excel - 从 Powershell 写入 Excel:如何设置值的单元格格式?