scala - Flink的DataStream的timeWindow有问题[求助]
问题描述
我是 Flink 的初学者。我可以实现 Flink 流。我还练习了关于 KeyedStream 的 timeWindow。现在想用Flink关于DataStream的timeWindow来实现批量操作。我尝试了一些代码,但它不起作用。我不知道如何达到预期的效果。
val resultDataStream: DataStream[HBaseOperation] = env
.addSource(kafkaUserEventSource)
.assignTimestampsAndWatermarks(new Launcher.CustomWatermarkExtractor(Time.hours(24)))
.flatMap { line =>
import scala.collection.JavaConverters._
val tableInfo: mutable.Buffer[TableInfo] = JSON.parseArray(line.columnValueList, classOf[TableInfo]).asScala
line.eventType match {
case EventType.INSERT => tableInfo.map {
row => HBaseOperation(line.eventType, s"${line.dbName}.${line.tableName}", HBASE_CFNAME, tableInfo(0).columnValue, row.columnName, row.columnValue)
}
case EventType.UPDATE => tableInfo.filter(_.isValid).map {
row => HBaseOperation(line.eventType, s"${line.dbName}.${line.tableName}", HBASE_CFNAME, tableInfo(0).columnValue, row.columnName, row.columnValue)
}
case EventType.DELETE => List(HBaseOperation(line.eventType, s"${line.dbName}.${line.tableName}", HBASE_CFNAME, tableInfo(0).columnValue, null, null))
}
}
resultDataStream.print()//Here it works
//batch operations
val value: AllWindowedStream[HBaseOperation, TimeWindow] = resultDataStream.timeWindowAll(Time.seconds(5))
val resultDataStream2: DataStream[BatchHBaseOperation] = value.apply(new RichAllWindowFunction[HBaseOperation, BatchHBaseOperation, TimeWindow] {
override def apply(window: TimeWindow, input: Iterable[HBaseOperation], out: Collector[BatchHBaseOperation]): Unit = {
val ops: ListBuffer[HBaseOperation] = ListBuffer[HBaseOperation]()
input.foreach(op => ops += op)
out.collect(BatchHBaseOperation(ops.toList))
}
})//Doesn't work
resultDataStream2.print()
//resultDataStream2.addSink(HBaseUtil.putMapData(_))
env.execute("KafkaHBaseApp")
解决方案
你的水印生成器的细节不清楚,但是如果它是基于Flink的有界乱序水印,那么CustomWatermarkExtractor(Time.hours(24))
意味着直到24小时数据(加上5秒)才会第一次触发时间窗口) 已处理。这可以解释为什么它似乎不起作用。
推荐阅读
- google-bigquery - 如何将大查询 event_timestamps 转换为 PST?
- css - 对于 React Native,行中的 Flexbox 内容存在问题
- postgresql - 使用条件或查询 dsl 查找给定模式中的所有表名
- javascript - Tinymce 图片上传 - 保存到数据库
- office-js - Office.context.document.settings.get 无法跨多个平台使用同一个 Word 加载项
- parallel-processing - PyOpenCL - 多维缩减内核
- html - 使用正则表达式解析广泛的 HTML 表
- javascript - 在反应状态下附加多维数组
- wcf - PowerShell WCF“基础连接已关闭:接收时发生意外错误。”
- android - 让 Android 模拟器与 iPhone 热点一起工作