java - Apache Flink Table 查询结果为字符串值
问题描述
我正在从 flink table api 编写查询以检索记录。然后检查是否找到记录,如果找到,则获取每个记录列值的字符串值。
IE
users:
|id | name | phone |
|---|------|-------|
| 01| sam | 23354 |
| 02| jake | 23352 |
| 03| kim | 23351 |
问题是 flink 仅从查询中返回表,所以我无法 1:检查是否找到记录 2:获取找到的记录值的各个值
须藤代码:
foundRecord = find record by phone
if foundRecord {
create new instance of Visitor
Visitor.name = foundRecord.name
Visitor.id = foundRecord.id
} else {
throw exception
}
flink docs 推荐的下面的代码给了我一个表,但不确定如何实现上面的 sudo 代码,因为它作为另一个表返回,我需要实际的记录值。
Table users = registeredUsers.select("id, name, phone").where("phone === '23354'"));
参考的 Flink 文档:https ://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html#expression-syntax
解决方案
为了知道找不到匹配的记录,输入必须是有界的——所以我们将使用 a BatchTableEnvironment
,而不是 a StreamTableEnvironment
。(使用流输入,匹配的记录最终可能会到达并满足查询。只有使用批量输入,我们才能证明不存在匹配。)
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.scala.BatchTableEnvironment
import org.apache.flink.types.Row
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.util.Collector
class MissingResultException() extends Exception {}
object Phone {
case class Visitor(name: String, id: String)
@throws[Exception]
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = BatchTableEnvironment.create(env)
val rawInput = env.fromElements(
("01", "sam", "23354"),
("02", "jake", "23352"),
("03", "kim", "23351"))
val events = tableEnv.fromDataSet(rawInput, 'id, 'name, 'phone)
tableEnv.registerTable("events", events)
val resultTable = tableEnv
.from("events")
.select('id, 'name, 'phone)
.where("phone === 'missing'")
val results = resultTable.toDataSet[Row]
results
.map(row => new Visitor(row.getField(1).toString, row.getField(0).toString))
.print
val count: DataSet[Long] = env.fromElements(results.count())
count
.flatMap(new FlatMapFunction[Long, Collector[Long]]{
override def flatMap(x: Long, collector: Collector[Collector[Long]]): Unit = {
if (x == 0L) {
throw new MissingResultException
}
}})
.print()
}
}
我用来检测结果集为空的方法感觉像是一种黑客攻击,但我想不出更好的方法。请注意,print()
最后的 是必要的,尽管没有什么要打印的,因为任何最终没有馈送到接收器的计算都将被优化掉,而不是执行。
推荐阅读
- android - AutocompleteTextView 到 EditText 切换导致一些 UI 错误
- javascript - 解决网页中的 not a function 错误
- google-cloud-platform - 如何让 NetBSD 虚拟机在谷歌计算引擎中自行停止
- awk - 如何使用awk按顺序打印数据?
- angular - NGRX/数据实体 getAll 将旧数据与新数据连接起来,而不是更新
- sql - 这个符号 '<>' 在 Laravel Query Builder 中是什么意思?
- asynchronous - 如何在 RxJs 中异步抛出错误?
- node.js - 为什么卷中的 vendor/node_modules 映射被认为是不好的做法?
- google-sheets - Arrayformula 连接字符串 - 它会减慢我的工作表吗?
- python - 由祖父母选择父母的最佳方式......在sqlalchemy中