apache-flink - Flink Table API 中如何为表中的每一行分配一个唯一的 ID?
问题描述
我正在使用 Flink 来计算一系列操作。每个操作都会生成一个表,该表既用于下一个操作,也存储在 S3 中。这使得可以查看计算中每个中间步骤的数据并查看每个操作的效果。
我需要为每个表中的每一行分配一个唯一标识符,以便当该标识符在以下步骤中再次出现(可能在不同的列中)时,我知道两行相互关联。
第一个明显的候选似乎是ROW_NUMBER()
函数,但是:
它似乎不在表表达式 API 的任何地方。我必须构造 SQL 字符串吗?
我该如何使用它?当我尝试这个查询时:
SELECT *, ROW_NUMBER() OVER (ORDER BY f0) AS rn FROM inp
我收到此错误:
org.apache.flink.table.api.ValidationException: Over Agg: The window rank function without order by. please re-check the over window statement.
它总是需要对表格进行排序吗?这似乎是我宁愿避免的开销。
下一个选项只是为每一行生成一个随机 UUID。但是当我尝试这个时,相同的 UUID 永远不会被使用两次,所以它完全没用。这是一个例子:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object Sandbox {
def main(args: Array[String]): Unit = {
val env = StreamTableEnvironment.create(
StreamExecutionEnvironment.getExecutionEnvironment
)
val inp = env.fromValues(1.as("id"))
val out1 = inp.addColumns(uuid().as("u"))
val out2 = out1.addColumns($"u".as("u2"))
env.executeSql("""
CREATE TABLE out1 ( id INTEGER, u VARCHAR(36) )
WITH ('connector' = 'print')
""")
env.executeSql("""
CREATE TABLE out2 ( id INTEGER, u VARCHAR(36), u2 VARCHAR(36) )
WITH ('connector' = 'print')
""")
env.createStatementSet()
.addInsert("out1", out1)
.addInsert("out2", out2)
.execute()
// Equivalent to the createStatementSet method:
out1.executeInsert("out1")
out2.executeInsert("out2")
}
}
我得到的输出:
[info] +I(1,4e6008ad-868a-4f95-88b0-38ee7969067d)
[info] +I(1,55da264d-1e15-4c40-94d4-822e1cd5db9c,c9a78f93-580c-456d-9883-08bc998124ed)
我需要 UUIDout1
重新出现out2
在两列中,例如:
[info] +I(1,4e6008ad-868a-4f95-88b0-38ee7969067d)
[info] +I(1,4e6008ad-868a-4f95-88b0-38ee7969067d,4e6008ad-868a-4f95-88b0-38ee7969067d)
我想这是由于文档中的这个注释:
此函数不是确定性的,这意味着将为每条记录重新计算该值。
如何只计算一次 UUID 并使其“具体”,以便将相同的值发送到out1
和out2
?
我使用用户定义的函数得到了类似的结果:
class uuidUdf extends ScalarFunction {
def eval(): String = UUID.randomUUID().toString
}
val out1 = inp.addColumns(call(new uuidUdf()).as("u"))
解决方案
推荐阅读
- django - 在 Django 中选择哪种方式计算 PV 和 UV?
- select - 8.7 排版选择连接子查询不起作用
- sql - 使用空值或零值字段对日期和时间字段进行分组
- c# - 使用 '.NETFramework,Version=v4.6.1' 而不是项目目标框架 .NETCoreApp,Version=v2.0 恢复了包 'SassAndCoffee.Core 1.0.0'
- sql - SQL 在插入查询命令之前从其他表中获取值
- react-native - React Native 和 Flow 兼容性历史
- javascript - 如何将所有下拉菜单的值更改为所选选项?
- c# - 带有脚本的 Unity 音频错误
- javascript - 快速播放声音
- node.js - 离子以时间延迟填充列表