首页 > 解决方案 > flink 数据不是由 timewindow 算子中的 process 函数处理的

问题描述

我有一个时间窗口,我试图确定我是否在一段时间内获得了新密钥。我正在通过 kafka 推送数据,当我调试它时,我看到数据正在到达keyby方法,但它没有到达process方法并且没有被收集器收集。我BoundedOutOfOrdernessTimestampExtractor用于分配水印:

    case class Src(qip:Ip, ref: Ip, ts: Long) extends FooRequest

    class TsExtractor extends BoundedOutOfOrdernessTimestampExtractor[Src](Time.hours(3)){
      override def extractTimestamp(element: Src): Long = element.ts
    }

    class RefFilter extends ProcessWindowFunction[Src, IpDetectionSrc, String, TimeWindow]{
      private lazy val stateDescriptor = new ValueStateDescriptor("refFilter",  createTypeInformation[String])

      override def process(key: String, context: Context, elements: Iterable[Src], out: Collector[IpDetectionSrc]): Unit = {
        println(s"RefIpFilter processing $key")//data is not getting here 
        if(Option(context.windowState.getState(stateDescriptor).value()).isEmpty){
          println(s"new key found $key") //data is not getting here also 
          context.windowState.getState(stateDescriptor).update(key)
          out.collect(elements.head)
        }
      }
    }

lazy val env: StreamExecutionEnvironment =
    setupEnv(StreamExecutionEnvironment.getExecutionEnvironment)(300000,Some(stateDir), Some(TimeCharacteristic.EventTime))

 lazy val src: DataStream[FooRequest] = env.addSource(consumer)

 lazy val uniqueRef:DataStream[FooRequest] => DataStream[Src] = src => src 
        .flatMap(new FlatMapFunction[FooRequest,Src ]{
          override def flatMap(value: FooRequest, out: Collector[Src]): Unit =   value match {
            case r: Src =>
              out.collect(r)
            case invalid =>
              log.warn(s"filtered unexpected request $invalid")
          }
        })
        .assignTimestampsAndWatermarks(new TsExtractor)
        .keyBy(r => r.ref)
        .timeWindow(Time.seconds(120))
        .allowedLateness(Time.seconds(360))
        .process(new RefFilter)

uniqueRef(src).addSink(sink)
env.execute()

任何帮助将不胜感激

标签: scalaapache-flinkflink-streaming

解决方案


BoundedOutOfOrdernessTimestampExtractor跟踪它迄今为止看到的最高时间戳,并产生水印,该水印在配置的延迟后(在这种情况下为三个小时)。这些水印会定期生成,默认为每 200 毫秒。所以只有一个事件,水印会比这个事件晚3个小时,并且永远不会触发窗口。此外,对于有限输入,作业将在处理完所有事件后停止运行。

context.windowState是每个窗口的状态,具有有限的生命周期。每个 2 分钟窗口都有自己的实例,一旦窗口允许的延迟过期,它就会被清除。如果您想要具有全局范围的键控窗口状态,并且具有无限的生命周期,请context.globalState改用。


推荐阅读