首页 > 解决方案 > Flink SQL:Outer Join with Group By 给出了意外的输出

问题描述

我有两个 Flink 动态表EventConfiguration.

Event有结构:[id, myTimestamp]并且Configuration有结构:id, myValue, myTimestamp

我正在尝试执行返回的 Flink SQL 查询Event.id, Configuration.myValue,或者Event.id, null如果该Event行与任何fromid不匹配。idConfiguration

预期行为示例(Event开始为Configuration空):

该示例必须阅读为:

[DATA_RECEIVED] => TARGET_TABLE : EXPECTED_OUTPUT

由于 SQL Query 是由连接制成的,因此它被插入到一个UpsertSink(输出的第一个值对应于 upsert 布尔值)

[myId-1, 10]            => EventTable           : [(true, myId-1, null)]
[myId-1, myValue-A, 15] => ConfigurationTable   : [(false, myId-1, null), (true, myId-1, myValue-A)]
[myId-1, myValue-A, 20] => ConfigurationTable   : [(false, myId-1, myValue-A), (true, myId-1, myValue-A)]
[myId-1, myValue-B, 25] => ConfigurationTable   : [(false, myId-1, myValue-A), (true, myId-1, myValue-B)]
[myId-1, 30]            => EventTable           : [(false, myId-1, null), (true, myId-1, myValue-B)]

所以我做了这个查询:

SELECT
   Event.id,
   Configuration.myValue
FROM
  (SELECT id, MAX(myTimestamp) as myTimestamp FROM Event GROUP BY id) as Event
    LEFT JOIN
  (SELECT id, LATEST_VAL(myValue, myTimestamp) as myValue, MAX(myTimestamp) as myTimestamp FROM Configuration GROUP BY id, myValue) as Configuration
    ON Event.id = Configuration.id
GROUP BY Event.id, Configuration.myValue

哪里LATEST_VAL是返回myValue关联的 UDF MAX(myTimestamp)

但我有我不理解的行为。以下是观察到的结果:

[myId-1, 10]            => EventTable           : [(true, myId-1, null)] // OK
[myId-1, myValue-A, 15] => ConfigurationTable   : [(false, myId-1, null), (true, myId-1, myValue-A)] // OK
[myId-1, myValue-A, 20] => ConfigurationTable   : [(false, myId-1, myValue-A), (true, myId-1, null), (false, myId-1, null), (true, myId-1, myValue-A)] // NOT OK
[myId-1, myValue-B, 25] => ConfigurationTable   : [(false, myId-1, myValue-A), (true, myId-1, null), (false, myId-1, null), (true, myId-1, myValue-B)] // NOT OK
[myId-1, 30]            => EventTable           : [(false, myId-1, null), (true, myId-1, myValue-B)] // OK

您如何解释预期行为和观察到的行为之间的区别?为什么会有额外的输出(true, myId-1, null), (false, myId-1, null)

是否可以调整 SQL 查询以获得想要的行为?

笔记 :

标签: sqlapache-flinkflink-sql

解决方案


我认为您错过的一点是您实际上加入了两个撤回流。即使您的输入流是仅附加流,您也会在产生撤回的子查询中对它们执行聚合。

我们首先分析子查询的结果:

子查询 1:

Query: SELECT id, MAX(myTimestamp) as myTimestamp FROM Event GROUP BY id

Resulting stream:
 (true, myId-1, 10L)
 (false, myId-1, 10L)
 (true, myId-1, 30L)

子查询 2:

Query: SELECT id, LATEST_VAL(myValue, myTimestamp) as myValue, MAX(myTimestamp) as myTimestamp FROM Configuration GROUP BY id, myValue

Resulting stream:
 (true, "myId-1", "myValue-A", 15L)
 (false, "myId-1", "myValue-A", 15L)
 (true, "myId-1", "myValue-A", 20L)
 (false, "myId-1", "myValue-A", 20L)
 (true, "myId-1", "myValue-B", 25L)

之后,您在这两个撤回流之上执行连接和分组。考虑到这一点,您的示例中实际加入和分组的是:

[true, myId-1, 10]             : [(true, myId-1, null)]
[true, myId-1, myValue-A, 15]  : [(false, myId-1, null), (true, myId-1, myValue-A)]
[false, myId-1, myValue-A, 15] : [(false, myId-1, myValue-A), (true, myId-1, null)]
[true, myId-1, myValue-A, 20]  : [(false, myId-1, null), (true, myId-1, myValue-A)]
[false, myId-1, myValue-A, 20] : [(false, myId-1, myValue-A), (true, myId-1, null)]
[true, myId-1, myValue-B, 25]  : [(false, myId-1, null), (true, myId-1, myValue-B)]
...

总的来说,据我所知,它会产生正确的结果。对于每个输入行,最后发出的行表示与给定 id 对应的最新值。


推荐阅读