首页 > 解决方案 > Flink Table API 中如何为表中的每一行分配一个唯一的 ID?

问题描述

我正在使用 Flink 来计算一系列操作。每个操作都会生成一个表,该表既用于下一个操作,也存储在 S3 中。这使得可以查看计算中每个中间步骤的数据并查看每个操作的效果。

我需要为每个表中的每一行分配一个唯一标识符,以便当该标识符在以下步骤中再次出现(可能在不同的列中)时,我知道两行相互关联。

第一个明显的候选似乎是ROW_NUMBER()函数,但是:

  1. 它似乎不在表表达式 API 的任何地方。我必须构造 SQL 字符串吗?

  2. 我该如何使用它?当我尝试这个查询时:

    SELECT *, ROW_NUMBER() OVER (ORDER BY f0) AS rn FROM inp

    我收到此错误:

    org.apache.flink.table.api.ValidationException: Over Agg: The window rank function without order by. please re-check the over window statement.

  3. 它总是需要对表格进行排序吗?这似乎是我宁愿避免的开销。

下一个选项只是为每一行生成一个随机 UUID。但是当我尝试这个时,相同的 UUID 永远不会被使用两次,所以它完全没用。这是一个例子:

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object Sandbox {
  def main(args: Array[String]): Unit = {

    val env = StreamTableEnvironment.create(
      StreamExecutionEnvironment.getExecutionEnvironment
    )

    val inp = env.fromValues(1.as("id"))
    val out1 = inp.addColumns(uuid().as("u"))
    val out2 = out1.addColumns($"u".as("u2"))

    env.executeSql("""
      CREATE TABLE out1 ( id INTEGER, u VARCHAR(36) )
      WITH ('connector' = 'print')
    """)

    env.executeSql("""
      CREATE TABLE out2 ( id INTEGER, u VARCHAR(36), u2 VARCHAR(36) )
      WITH ('connector' = 'print')
    """)

    env.createStatementSet()
      .addInsert("out1", out1)
      .addInsert("out2", out2)
      .execute()

    // Equivalent to the createStatementSet method:
    out1.executeInsert("out1")
    out2.executeInsert("out2")
  }
}

我得到的输出:

[info] +I(1,4e6008ad-868a-4f95-88b0-38ee7969067d)
[info] +I(1,55da264d-1e15-4c40-94d4-822e1cd5db9c,c9a78f93-580c-456d-9883-08bc998124ed)

我需要 UUIDout1重新出现out2在两列中,例如:

[info] +I(1,4e6008ad-868a-4f95-88b0-38ee7969067d)
[info] +I(1,4e6008ad-868a-4f95-88b0-38ee7969067d,4e6008ad-868a-4f95-88b0-38ee7969067d)

我想这是由于文档中的这个注释:

此函数不是确定性的,这意味着将为每条记录重新计算该值。

如何只计算一次 UUID 并使其“具体”,以便将相同的值发送到out1out2

我使用用户定义的函数得到了类似的结果:

    class uuidUdf extends ScalarFunction {
      def eval(): String = UUID.randomUUID().toString
    }

    val out1 = inp.addColumns(call(new uuidUdf()).as("u"))

标签: apache-flinkflink-streamingflink-sqlflink-table-api

解决方案


推荐阅读