scala - leftOuterJoin 抛出 TableException:不支持的连接类型“LEFT”
问题描述
我正在尝试在两个表上运行左外连接并将结果转换为 DataStream。
我在使用 flink 之前所做的所有连接都是内部连接,并且我总是在连接之后使用.toRetractStream[MyCaseClass](someQueryConfig)
. 但是,由于左连接引入了空值,我从flink 文档的理解是我不能再使用案例类,因为它们在将表转换为 DataStream 时不支持空值。
所以,我正在尝试使用 POJO 来实现这一点。这是我的代码:
class EnrichedTaskUpdateJoin(val enrichedTaskId: String, val enrichedTaskJobId: String, val enrichedTaskJobDate: String, val enrichedTaskJobMetadata: Json, val enrichedTaskStartedAt: String, val enrichedTaskTaskMetadata: Json, val taskUpdateMetadata: Json = Json.Null) {}
val qConfig = tableEnv.queryConfig
qConfig.withIdleStateRetentionTime(IDLE_STATE_RETENTION_TIME)
val updatedTasksUpsertTable = enrichedTasksUpsertTable
.leftOuterJoin(taskUpdatesUpsertTable, 'enrichedTaskId === 'taskUpdateId)
.select(
'enrichedTaskId,
'enrichedTaskJobId,
'enrichedTaskJobDate,
'enrichedTaskJobMetadata,
'enrichedTaskStartedAt,
'enrichedTaskTaskMetadata,
'taskUpdateMetadata
)
val updatedEnrichedTasksStream: KeyedStream[String, String] = updatedTasksUpsertTable
.toAppendStream[EnrichedTaskUpdateJoin](qConfig)
.map(toEnrichedTask(_))
.map(encodeTask(_))
.keyBy(x => parse(x).getOrElse(Json.Null).hcursor.get[String]("id").getOrElse(""))
这编译得很好,但是当我尝试运行它时,我得到org.apache.flink.table.api.TableException: Unsupported join type 'LEFT'. Currently only non-window inner joins with at least one equality predicate are supported
. 但是,根据这些文档,似乎我应该能够运行左连接。似乎还值得注意的是,错误是从.toAppendStream[EnrichedTaskUpdateJoin](qConfig)
. 我想也许non-window
错误的一部分暗示我的空闲状态保留时间有问题,所以我取出查询配置,但得到了同样的错误。
希望这有足够的上下文,但如果我需要添加任何其他内容,请告诉我。另外,我正在运行 flink 1.5-SNAPSHOT 和 Circe 进行 json 解析。我对 scala 也很陌生,所以这很可能只是一些愚蠢的语法错误。
解决方案
Flink 1.5-SNAPSHOT 不支持非窗口外连接。正如您在发布的链接中看到的那样,“外部连接”旁边没有“流式传输”标签。1.5 支持时间窗口连接(适用于时间属性)。
Flink 1.6 将提供LEFT
、RIGHT
和FULL
外连接(另见FLINK-5878)。
顺便提一句。确保这EnrichedTaskUpdateJoin
确实是一个 POJO,因为 POJO 需要一个默认构造函数,我认为也不var
是val
.
推荐阅读
- html - 如何在网格中制作图像标签的纵横比?
- ios - 快速使用 GTSiLib 的正确方法是什么?
- scheme - 什么决定了延续可以应用到的值的数量和类型?
- javascript - msg 对象的抽象变量——寻找循环对象
- javascript - Firestore 订购限制
- python - 比较多列中的 2 个数据帧并保存不匹配的行
- mysql - 如何仅将表的一行的内容链接到另一个表的列?
- swift - 尝试使用 StoreKit 恢复应用内购买时未收到预期的委托调用
- python - 如何将两个或多个参数传递给 Button 命令?
- c# - asp.net core 2.1中基于国家的本地化