apache-flink - 为事件时间间隔连接找到多个行时间字段
问题描述
我有两个要进行间隔连接的流,事件类型是案例类定义如下,tradeDate 的类型是java.sql.Timestamp
case class Stock(id: String, tradeDate: Timestamp, price: Double)
case class StockNameChanging(id: String, name: String, tradeDate: Timestamp)
当我运行以下应用程序时,抛出如下异常,我不知道它在说什么以及如何解决问题。
Found more than one rowtime field: [rt1, rt2] in the table that should be converted to a DataStream.
Please select the rowtime field that should be used as event-time timestamp for the DataStream by casting all other fields to TIMESTAMP.
org.apache.flink.table.api.TableException: Found more than one rowtime field: [rt1, rt2] in the table that should be converted to a DataStream.
Please select the rowtime field that should be used as event-time timestamp for the DataStream by casting all other fields to TIMESTAMP.
代码是:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//Stock stream
val ds1 = env.addSource(new IntervalJoinStockSource(emitInterval = 0)).assignTimestampsAndWatermarks(new StockWatermarkGenerator(4000))
//StockNameChanging stream
val ds2 = env.addSource(new IntervalJoinStockNameChangingSource(emitInterval = 0)).assignTimestampsAndWatermarks(new StockNameChangingWatermarkGenerator(4000))
val tenv = StreamTableEnvironment.create(env)
tenv.createTemporaryView("s1", ds1, $"id", $"price", $"tradeDate".rowtime() as "rt1")
tenv.createTemporaryView("s2", ds2, $"id", $"name", $"tradeDate".rowtime() as "rt2")
tenv.from("s1").printSchema()
tenv.from("s2").printSchema()
val sql =
"""
select s1.id, s2.name, s1.price, s1.rt1, s2.rt2
from s1 join s2
on s1.id = s2.id
where s1.rt1 between s2.rt2 - interval '2' second and s2.rt2 + interval '2' second
""".stripMargin(' ')
tenv.sqlQuery(sql).toAppendStream[Row].print()
env.execute()
解决方案
每当 Flink 进行事件时间处理时,每个事件都需要一个事件时间时间戳。在使用 Flink SQL 时,如果要进行 event-time 处理,每一行都必须有一个 rowtime 属性,并且只有一个。但是,您的查询创建的表有两个事件时间属性,s1.rt1 和 s2.rt2。Flink SQL 运行时抱怨,因为它不能为这个结果表中的行分配唯一的时间戳。
由于您没有在此管道中进行任何进一步的基于事件时间的处理,因此您实际上并不需要将这些列视为行时间列,因此您可以选择其中一个或两个作为时间戳进行 CAST。我相信这样的事情会奏效:
SELECT
s1.id, s2.name, s1.price, CAST(s1.rt1 AS TIMESTAMP) AS t1, CAST(s2.rt2 AS TIMESTAMP) AS t2
FROM
s1 join s2
...
推荐阅读
- python - 用虚拟 df 反向爆炸
- google-cloud-platform - Terraform Google 提供商,创建基于日志的警报策略
- python - 从父目录旁边的文件导入
- php - 使用ajax jQuery for-each函数在表中追加多个数据
- flutter - 如何在 catch 代码中访问 dio onResponse.statuscode
- java - 使用 factory.createParser 创建解析器时出错
- flutter - 如何在 Flutter Flame 中创建滚动视图
- text - 在 SwiftUI 中使用 Text 时,如何解决“调用初始化程序时没有完全匹配”错误?
- laravel - 在太长的时间内在 laravel 刀片中显示大数组
- sql - 从单个列中的以下数据中选择最小 START_DATE 和最大 END_DATE