scala - flink 数据不是由 timewindow 算子中的 process 函数处理的
问题描述
我有一个时间窗口,我试图确定我是否在一段时间内获得了新密钥。我正在通过 kafka 推送数据,当我调试它时,我看到数据正在到达keyby
方法,但它没有到达process
方法并且没有被收集器收集。我BoundedOutOfOrdernessTimestampExtractor
用于分配水印:
case class Src(qip:Ip, ref: Ip, ts: Long) extends FooRequest
class TsExtractor extends BoundedOutOfOrdernessTimestampExtractor[Src](Time.hours(3)){
override def extractTimestamp(element: Src): Long = element.ts
}
class RefFilter extends ProcessWindowFunction[Src, IpDetectionSrc, String, TimeWindow]{
private lazy val stateDescriptor = new ValueStateDescriptor("refFilter", createTypeInformation[String])
override def process(key: String, context: Context, elements: Iterable[Src], out: Collector[IpDetectionSrc]): Unit = {
println(s"RefIpFilter processing $key")//data is not getting here
if(Option(context.windowState.getState(stateDescriptor).value()).isEmpty){
println(s"new key found $key") //data is not getting here also
context.windowState.getState(stateDescriptor).update(key)
out.collect(elements.head)
}
}
}
lazy val env: StreamExecutionEnvironment =
setupEnv(StreamExecutionEnvironment.getExecutionEnvironment)(300000,Some(stateDir), Some(TimeCharacteristic.EventTime))
lazy val src: DataStream[FooRequest] = env.addSource(consumer)
lazy val uniqueRef:DataStream[FooRequest] => DataStream[Src] = src => src
.flatMap(new FlatMapFunction[FooRequest,Src ]{
override def flatMap(value: FooRequest, out: Collector[Src]): Unit = value match {
case r: Src =>
out.collect(r)
case invalid =>
log.warn(s"filtered unexpected request $invalid")
}
})
.assignTimestampsAndWatermarks(new TsExtractor)
.keyBy(r => r.ref)
.timeWindow(Time.seconds(120))
.allowedLateness(Time.seconds(360))
.process(new RefFilter)
uniqueRef(src).addSink(sink)
env.execute()
任何帮助将不胜感激
解决方案
BoundedOutOfOrdernessTimestampExtractor
跟踪它迄今为止看到的最高时间戳,并产生水印,该水印在配置的延迟后(在这种情况下为三个小时)。这些水印会定期生成,默认为每 200 毫秒。所以只有一个事件,水印会比这个事件晚3个小时,并且永远不会触发窗口。此外,对于有限输入,作业将在处理完所有事件后停止运行。
context.windowState
是每个窗口的状态,具有有限的生命周期。每个 2 分钟窗口都有自己的实例,一旦窗口允许的延迟过期,它就会被清除。如果您想要具有全局范围的键控窗口状态,并且具有无限的生命周期,请context.globalState
改用。
推荐阅读
- haskell - Haskell,对 Int 列表进行排序
- postgresql - 我的 postgresql 查询有问题
- html - 由于向描述中添加图标按钮,在未对齐后垂直对齐 mat-expansion-panel 标题
- c# - 从概念上比较 C# String 和 StringBuilders
- angular - Take 不是函数:Angular 7 | Observable.take 抛出运行时错误
- three.js - 三个 JS 轨道控制缩放、平移和移动监听器
- mysql - MySql 5.7 函数 UUID() 默认排序规则 - 排序规则的非法混合
- java - 一个rest api的多个数据库
- python - MSE损失函数在pytorch中是如何工作的?
- r - 闪亮的仪表板 DropdownMenu MessageItem Click