首页 > 解决方案 > 当连接键是bucketBy键的超集时,如何说服spark不要进行交换?

问题描述

在测试生产用例时,我创建并保存(使用 Hive Metastore)这样的表:

table1:
fields: key1, key2, value1
sortedBy key1,key2
bucketBy: key1, 100 buckets

table2:
fields: key1, key2, value2
sortedBy: key1,key2
bucketBy: key1, 100 buckets

我正在运行这样一个查询(在伪代码中)

table1.join(table2, [“key1”, “key2”])
 .groupBy(“value2”)
 .countUnique(“key1”)

常识说这个连接应该简单地用一个没有交换的排序合并连接来完成;然而spark做了一个交换然后加入。

即使对于这个特定的用例,我也可以按两个键进行存储,但由于其他一些用例,我需要按 key1 存储。当我使用这样的单个键进行(更简单)连接时:

table1.join(table2, [“key1”])

它按预期工作(即排序合并加入没有交换)。

现在我对这些表进行了优化连接,如果我想过滤,如下所示:

table1.join(table2, [“key1”])
 .filter(table1.col(“key2”) == table2.col(“key2”))

它恢复到交换然后加入。

当连接键是bucketBy键的超集时,如何说服spark不要进行交换?

笔记:

我知道的一个技巧是,如果我将重写为不等式检查,而不是相等性检查,spark 不会洗牌。

(x == y) 也可以表示为 ((x >= y) & (x <= y))。如果我在最后一个示例中应用两个这样的过滤器:

.filter(table1.col(“key2”) >= table2.col(“key2”))

.filter(table1.col(“key2”) <= table2.col(“key2”))

它将继续使用没有交换的排序合并连接,但这不是解决方案,这是一个 hack。

标签: apache-sparkjoinbuckethive-metastore

解决方案


根据一些研究和探索,这似乎是最简单的解决方案:

基于此示例:

table1.join(table2, [“key1”])
      .filter(table1.col(“key2”) == table2.col(“key2”))

而不是使用equalTo (==)来自 Spark 的,实现自定义MyEqualTo(通过委托给 sparkEqualTo实现很好)似乎解决了这个问题。这样,spark 不会优化(!)连接,它只会将过滤器拉到 SortMergeJoin 中。

类似地,连接条件也可以这样形成:

(table1.col(“key1”) == table2.col(“key1”)) AND
table1.col(“key2”).myEqualTo(table2.col(“key2”))

推荐阅读