首页 > 解决方案 > Flink的DataStream的timeWindow有问题[求助]

问题描述

我是 Flink 的初学者。我可以实现 Flink 流。我还练习了关于 KeyedStream 的 timeWindow。现在想用Flink关于DataStream的timeWindow来实现批量操作。我尝试了一些代码,但它不起作用。我不知道如何达到预期的效果。

val resultDataStream: DataStream[HBaseOperation] = env
  .addSource(kafkaUserEventSource)
  .assignTimestampsAndWatermarks(new Launcher.CustomWatermarkExtractor(Time.hours(24)))
  .flatMap { line =>
    import scala.collection.JavaConverters._
    val tableInfo: mutable.Buffer[TableInfo] = JSON.parseArray(line.columnValueList, classOf[TableInfo]).asScala
    line.eventType match {
      case EventType.INSERT => tableInfo.map {
        row => HBaseOperation(line.eventType, s"${line.dbName}.${line.tableName}", HBASE_CFNAME, tableInfo(0).columnValue, row.columnName, row.columnValue)
      }
      case EventType.UPDATE => tableInfo.filter(_.isValid).map {
        row => HBaseOperation(line.eventType, s"${line.dbName}.${line.tableName}", HBASE_CFNAME, tableInfo(0).columnValue, row.columnName, row.columnValue)
      }
      case EventType.DELETE => List(HBaseOperation(line.eventType, s"${line.dbName}.${line.tableName}", HBASE_CFNAME, tableInfo(0).columnValue, null, null))
    }
  }
resultDataStream.print()//Here it works

//batch operations
val value: AllWindowedStream[HBaseOperation, TimeWindow] = resultDataStream.timeWindowAll(Time.seconds(5))
val resultDataStream2: DataStream[BatchHBaseOperation] = value.apply(new RichAllWindowFunction[HBaseOperation, BatchHBaseOperation, TimeWindow] {
  override def apply(window: TimeWindow, input: Iterable[HBaseOperation], out: Collector[BatchHBaseOperation]): Unit = {
    val ops: ListBuffer[HBaseOperation] = ListBuffer[HBaseOperation]()
    input.foreach(op => ops += op)
    out.collect(BatchHBaseOperation(ops.toList))
  }
})//Doesn't work
resultDataStream2.print()
//resultDataStream2.addSink(HBaseUtil.putMapData(_))
env.execute("KafkaHBaseApp")

标签: scalaapache-flink

解决方案


你的水印生成器的细节不清楚,但是如果它是基于Flink的有界乱序水印,那么CustomWatermarkExtractor(Time.hours(24))意味着直到24小时数据(加上5秒)才会第一次触发时间窗口) 已处理。这可以解释为什么它似乎不起作用。


推荐阅读