java - Apache flink 1.52 Rowtime 时间戳为空
问题描述
我正在使用以下代码进行一些查询:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Row> ds = SourceHelp.builder().env(env).consumer010(MyKafka.builder().build().kafkaWithWaterMark2())
.rowTypeInfo(MyRowType.builder().build().typeInfo())
.build().source4();
//,proctime.proctime,rowtime.rowtime
String sql1 = "select a,b,max(rowtime)as rowtime from user_device group by a,b";
DataStream<Row> ds2 = TableHelp.builder().tableEnv(tableEnv).tableName("user_device").fields("a,b,rowtime.rowtime")
.rowTypeInfo(MyRowType.builder().build().typeInfo13())
.sql(sql1).in(ds).build().result();
ds2.print();
// String sql2 = "select a,count(b) as b from user_device2 group by a";
String sql2 = "select a,count(b) as b,HOP_END(rowtime,INTERVAL '5' SECOND,INTERVAL '30' SECOND) as c from user_device2 group by HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '30' SECOND),a";
DataStream<Row> ds3 = TableHelp.builder().tableEnv(tableEnv).tableName("user_device2").fields("a,b,rowtime.rowtime")
.rowTypeInfo(MyRowType.builder().build().typeInfo14())
.sql(sql2).in(ds2).build().result();
ds3.print();
env.execute("test");
注意:对于 sql1,我将 max 函数与 rowtime 一起使用,它不起作用,并引发以下异常:
线程“主”org.apache.flink.runtime.client.JobExecutionException 中的异常:java.lang.RuntimeException:行时间时间戳为空。请确保定义了正确的 TimestampAssigner 并且流环境使用 EventTime 时间特性。在 org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:625) 在 org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) 在 com.aicaigroup.water .WaterTest.testRowtimeWithMoreSqls5(WaterTest.java:158) at com.aiicaigroup.water.WaterTest.main(WaterTest.java:20) 原因:java.lang.RuntimeException:Rowtime 时间戳为空。请确保定义了正确的 TimestampAssigner 并且流环境使用 EventTime 时间特性。
然后我尝试像这样“从user_device中选择a,b,rowtime”来更新sql1,它可以工作。那么如何修复错误呢?第一个 sql 应该使用 group by,第二个 sql 应该使用 rowtime by timeWindow。3QS
解决方案
我从 1.6 开始 flink ,遇到类似的问题。通过这些步骤解决:
- 使用 assignTimestampsAndWatermarks ,只需使用默认和正常实现 BoundedOutOfOrdernessTimestampExtractor。您需要编写 extractTimestamp 函数来提取时间戳值并在构造函数中声明窗口间隔。
- 在字段末尾附加 ,proctime.proctime,rowtime.rowtime (我使用 fromDataStream(Flink 1.6) 将流转换为表)
- 如果您想使用现有字段作为行时间。例如数据源字段为 "a,clicktime,c" ,可以声明 "a,clicktime.rowtime,c"
希望它可以帮助你。
推荐阅读
- javascript - 无法使用 javascript 从 AWS S3 存储桶获取文件计数或列表
- android - 我可以在 Google Play 控制台的哪个位置查看我的应用的 Android 版本分布?
- sequelize.js - Sequelize - 包含模块,而 where 子句是可选的
- r - 如何仅使用非 NA 值为每一行数据帧制作 bin?
- android - 实现 .aar 库和上下文问题
- c# - 如何有条件地在linq中添加where条件
- python - Python selenium - 找不到元素
- excel - Excel:动态相关列表
- weka - WEKA Ham 或 Spam J48 显示为灰色
- python - 我的输出如何显示真实的列标题而不是自动增量数字