首页 > 解决方案 > 在 Flink 中重新分配时间戳、水印?

问题描述

考虑我这样做:

DataStream<POJO> ds = ...
ds.assignTimestampsAndWatermarks(CustomAssigner)
.windowAll(...)
.apply(someFunction) //THIS FUNCTION CHANGES THE TIMESTAMP FIELD IN THE EVENTS
.assignTimestampsAndWatermarks(AnotherCustomAssigner)

这是有效的吗?我不知道水印/时间戳是全局的还是仅保留在数据流中?

编辑

class POJO{
   int timestamp;
   String someDetail; //key by this
   ...
}

数据流 ds = ....

ds.assignTimeStampsAndWatermarks(new AssignerWithPunctuatedWatermarks(){
   long maxTS = Long.MIN_VALUE;
   Watermarks checkAndGetNextWater(POJO, p, long l){
  maxTS = max(...)
  return new Watermarks(maxTS);
}

long ExtractTS(POJO p, long l){
  maxTS = max(...)
  return p.timeStamp;
}


  }).keyBy(someDetail property)
     .window(TumblingWindow(1 min))
      .apply(new AllWindowFunction<POJO, POJO, String, TimeWindow>(){
  public void apply(...){
    POJO newPOJO = ...;
    for(POJO it : iterable){
      newPOJO.timeStamp += ...
    }
    collector.collect(newPOJO);
  }
}) 

现在我想知道

如果我应该再次分配时间戳,因为我想做windowAll,然后再分配apply一次。

assignTimestamp...
.windowAll(..)
.apply(some other allwindow function)

标签: javaapache-flink

解决方案


您不应再次调用 assignTimestampsAndWatermarks。Flink 将忽略由 WindowFunction 创建的 POJO 中的时间戳,并使用从该窗口结束时的时间派生的时间戳为包装这些事件的流记录添加时间戳。通常这工作得很好,尽管后续窗口需要覆盖一个时间帧,该时间帧是第一个的整数倍。

如果您正在尝试构建一个应该重新加时间戳并拥有自己的新水印的全新流,那么再次调用 assignTimestampsAndWatermarks 可能会起作用。


推荐阅读