首页 > 解决方案 > 为事件时间间隔连接找到多个行时间字段

问题描述

我有两个要进行间隔连接的流,事件类型是案例类定义如下,tradeDate 的类型是java.sql.Timestamp

case class Stock(id: String, tradeDate: Timestamp, price: Double)
case class StockNameChanging(id: String, name: String, tradeDate: Timestamp)

当我运行以下应用程序时,抛出如下异常,我不知道它在说什么以及如何解决问题。

Found more than one rowtime field: [rt1, rt2] in the table that should be converted to a DataStream.
Please select the rowtime field that should be used as event-time timestamp for the DataStream by casting all other fields to TIMESTAMP.
org.apache.flink.table.api.TableException: Found more than one rowtime field: [rt1, rt2] in the table that should be converted to a DataStream.
Please select the rowtime field that should be used as event-time timestamp for the DataStream by casting all other fields to TIMESTAMP.

代码是:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//Stock stream
val ds1 = env.addSource(new IntervalJoinStockSource(emitInterval = 0)).assignTimestampsAndWatermarks(new StockWatermarkGenerator(4000))

//StockNameChanging stream
val ds2 = env.addSource(new IntervalJoinStockNameChangingSource(emitInterval = 0)).assignTimestampsAndWatermarks(new StockNameChangingWatermarkGenerator(4000))
val tenv = StreamTableEnvironment.create(env)
tenv.createTemporaryView("s1", ds1, $"id", $"price", $"tradeDate".rowtime() as "rt1")
tenv.createTemporaryView("s2", ds2, $"id", $"name", $"tradeDate".rowtime() as "rt2")
tenv.from("s1").printSchema()
tenv.from("s2").printSchema()
val sql =
  """
  select s1.id, s2.name, s1.price, s1.rt1, s2.rt2
  from s1 join s2
  on s1.id = s2.id
  where s1.rt1 between s2.rt2 - interval '2' second and s2.rt2 + interval '2' second

  """.stripMargin(' ')

tenv.sqlQuery(sql).toAppendStream[Row].print()
env.execute()

标签: apache-flinkflink-sql

解决方案


每当 Flink 进行事件时间处理时,每个事件都需要一个事件时间时间戳。在使用 Flink SQL 时,如果要进行 event-time 处理,每一行都必须有一个 rowtime 属性,并且只有一个。但是,您的查询创建的表有两个事件时间属性,s1.rt1 和 s2.rt2。Flink SQL 运行时抱怨,因为它不能为这个结果表中的行分配唯一的时间戳。

由于您没有在此管道中进行任何进一步的基于事件时间的处理,因此您实际上并不需要将这些列视为行时间列,因此您可以选择其中一个或两个作为时间戳进行 CAST。我相信这样的事情会奏效:

SELECT
  s1.id, s2.name, s1.price, CAST(s1.rt1 AS TIMESTAMP) AS t1, CAST(s2.rt2 AS TIMESTAMP) AS t2
FROM
  s1 join s2
...

推荐阅读