apache-kafka - flink 的 Kafka 水印策略在我的应用程序中不起作用
问题描述
我使用 flink 1.13.0 版本
当我尝试通过 flink doc 使用似乎不起作用的 Kafka Watermark Strategies 时,窗口进程功能将不会运行。
而且我想通过这种方式知道,水印的时间戳将使用kafka中的消耗时间或生产时间?
我的消费者的代码是这样的:
val source = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties)
.setCommitOffsetsOnCheckpoints(true)
.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[String](Duration.ofSeconds(5)))
stream = env.addSource(source)
.name(nodeCfg.nodeName)
.uid(nodeCfg.nodeName)
.setParallelism(nodeCfg.workerCount)
并像这样使用窗口:
processStream
.keyBy(_.num)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(new LatTimeAggregate(), new SignLatCalculateProcess())
.name(nodeCfg.nodeName)
.uid(nodeCfg.nodeName)
.setParallelism(nodeCfg.workerCount)
.addSink(new SignLatSink(serverConfig.smsRuleRedis))
.name("lat_count_sink")
.uid("lat_count_sink")
和这样的拓扑图:
解决方案
由于您没有在水印策略中指定时间戳分配器,因此您依赖 FlinkKafkaConsumer 为流记录分配时间戳。这仅在从 Kafka 读取的记录在其标头中有时间戳时才有效。否则,您需要实现时间戳分配器以从事件中提取时间戳。
请注意,您将无法实现 FlinkKafkaConsumer 可以应用的时间戳分配器,除非您还实现了反序列化器,FlinkKafkaConsumer 可以使用该反序列化器来生成带有时间戳的对象,然后可以提取该对象。否则,您可以选择在源之后的某处应用水印策略。
如果缺少时间戳不是问题,那么可能还有其他问题。例如,您可能有一个空闲的 Kafka 分区,或者缺少足够远的事件来关闭窗口。
顺便说一句,如果您的事件是按分区排列的,并且如果您在 FlinkKafkaConsumer 上调用 assignTimestampsAndWatermarks(您目前正在这样做),那么您可以使用forMonotonousTimestamps
而不是forBoundedOutOfOrderness
,它具有一些显着的优势。
推荐阅读
- javascript - 使用表格按钮时表格编辑的问题。所以我正在使用 crud 操作创建学生注册表。编辑遇到问题
- .net - 如何在 .NET API 中处理多个提交/发布请求?
- informatica - 如何在 informatica 中的现有 XML 生成器转换中添加新端口
- android - 有没有办法强制“显示在其他应用程序上”Android 10 Go?
- android - Flutter 中的“用于 Null 值的 Null 检查运算符”
- javascript - 使用 javascript 在 Html 中使用 Async Await 渲染 API/Json 数据
- ios - 使用 WKWebView swift 获取动态加载的 html
- node.js - 阅读 API 文档
- jpa - JPA 一对多与许多孩子
- javascript - 使用 onFocus 和 onBlur 有条件地渲染 currency-input-field 包的 currencyInput 组件的后缀,但它仅在第一次工作