java - 在 Flink 中重新分配时间戳、水印?
问题描述
考虑我这样做:
DataStream<POJO> ds = ...
ds.assignTimestampsAndWatermarks(CustomAssigner)
.windowAll(...)
.apply(someFunction) //THIS FUNCTION CHANGES THE TIMESTAMP FIELD IN THE EVENTS
.assignTimestampsAndWatermarks(AnotherCustomAssigner)
这是有效的吗?我不知道水印/时间戳是全局的还是仅保留在数据流中?
编辑
class POJO{
int timestamp;
String someDetail; //key by this
...
}
数据流 ds = ....
ds.assignTimeStampsAndWatermarks(new AssignerWithPunctuatedWatermarks(){
long maxTS = Long.MIN_VALUE;
Watermarks checkAndGetNextWater(POJO, p, long l){
maxTS = max(...)
return new Watermarks(maxTS);
}
long ExtractTS(POJO p, long l){
maxTS = max(...)
return p.timeStamp;
}
}).keyBy(someDetail property)
.window(TumblingWindow(1 min))
.apply(new AllWindowFunction<POJO, POJO, String, TimeWindow>(){
public void apply(...){
POJO newPOJO = ...;
for(POJO it : iterable){
newPOJO.timeStamp += ...
}
collector.collect(newPOJO);
}
})
现在我想知道
如果我应该再次分配时间戳,因为我想做windowAll
,然后再分配apply
一次。
assignTimestamp...
.windowAll(..)
.apply(some other allwindow function)
解决方案
您不应再次调用 assignTimestampsAndWatermarks。Flink 将忽略由 WindowFunction 创建的 POJO 中的时间戳,并使用从该窗口结束时的时间派生的时间戳为包装这些事件的流记录添加时间戳。通常这工作得很好,尽管后续窗口需要覆盖一个时间帧,该时间帧是第一个的整数倍。
如果您正在尝试构建一个应该重新加时间戳并拥有自己的新水印的全新流,那么再次调用 assignTimestampsAndWatermarks 可能会起作用。
推荐阅读
- python - 在字符串中嵌入值时,Python 中的 %s 和 % 有什么区别?
- swift - 如何从组合的 iOS/Watch Xcode 项目中仅将独立的 Apple Watch 应用程序发布到 App Store
- reactjs - 带有 Firebase 托管的 Github CI 上 React 应用程序的绝对路径
- python - Dnspython 无法解决任何问题
- python - 使用open cv在python中画一个圆圈但不了解机制
- time-complexity - 两个数字相乘的时间复杂度
- java - 为什么 Java 使用 -> 而不是 => 来表示 lambda 函数?
- objective-c - 在 Objective C 中启动 NSColorPanel
- python - vscode格式化具有许多元素的列表变量
- javascript - 如何使用 javascript 在 url 中传递变量并使用 php 接收它们?