首页 > 解决方案 > KeyedProcessFunction 中所有键的 Flink 通用状态

问题描述

我想为KeyedProcessFunction下面的所有状态保留一个简单的值;

class StateC() extends KeyedProcessFunction[Long, A, B] {
  var timestamp: Timestamp = _

  override def open(parameters: Configuration): Unit = {
    timestamp = assignCurrentHour()
  }

 override def processElement(item: A, ... ): Unit = {
    val currentHour = now.truncateToHour()
    if (currentHour.after(timestamp)) { 
      timestamp = assignCurrentHour()
    }
   .... 
 }
}

我只是想知道我是否在新的时间。为此,我保留timestamp变量。变量的值timestamp对于此 TaskManager 中的所有键都是通用的。所以我不需要为每个键说明。

在这种情况下,timestamp变量将task manager在新的时间处理任何事件时更新,对吧?

可以同时修改timestamp变量吗?

标签: apache-flinkflink-streaming

解决方案


您提出的实现看起来不错(但请记住,我不知道您的所有要求)。的每个并行实例都KeyedProcessFunction将具有自己的时间戳版本,并且每个实例都会在新的时间处理事件后立即更新其时间戳。AKeyedProcessFunction是单线程的:您不必担心并发更新。(该onTimer方法与其他方法同步;那里没有什么可担心的。)


推荐阅读