apache-spark - 当连接键是bucketBy键的超集时,如何说服spark不要进行交换?
问题描述
在测试生产用例时,我创建并保存(使用 Hive Metastore)这样的表:
table1:
fields: key1, key2, value1
sortedBy key1,key2
bucketBy: key1, 100 buckets
table2:
fields: key1, key2, value2
sortedBy: key1,key2
bucketBy: key1, 100 buckets
我正在运行这样一个查询(在伪代码中)
table1.join(table2, [“key1”, “key2”])
.groupBy(“value2”)
.countUnique(“key1”)
常识说这个连接应该简单地用一个没有交换的排序合并连接来完成;然而spark做了一个交换然后加入。
即使对于这个特定的用例,我也可以按两个键进行存储,但由于其他一些用例,我需要按 key1 存储。当我使用这样的单个键进行(更简单)连接时:
table1.join(table2, [“key1”])
它按预期工作(即排序合并加入没有交换)。
现在我对这些表进行了优化连接,如果我想过滤,如下所示:
table1.join(table2, [“key1”])
.filter(table1.col(“key2”) == table2.col(“key2”))
它恢复到交换然后加入。
当连接键是bucketBy键的超集时,如何说服spark不要进行交换?
笔记:
我知道的一个技巧是,如果我将重写为不等式检查,而不是相等性检查,spark 不会洗牌。
(x == y) 也可以表示为 ((x >= y) & (x <= y))。如果我在最后一个示例中应用两个这样的过滤器:
.filter(table1.col(“key2”) >= table2.col(“key2”))
.filter(table1.col(“key2”) <= table2.col(“key2”))
它将继续使用没有交换的排序合并连接,但这不是解决方案,这是一个 hack。
解决方案
根据一些研究和探索,这似乎是最简单的解决方案:
基于此示例:
table1.join(table2, [“key1”])
.filter(table1.col(“key2”) == table2.col(“key2”))
而不是使用equalTo (==)
来自 Spark 的,实现自定义MyEqualTo
(通过委托给 sparkEqualTo
实现很好)似乎解决了这个问题。这样,spark 不会优化(!)连接,它只会将过滤器拉到 SortMergeJoin 中。
类似地,连接条件也可以这样形成:
(table1.col(“key1”) == table2.col(“key1”)) AND
table1.col(“key2”).myEqualTo(table2.col(“key2”))
推荐阅读
- reactjs - 您在 a 上指定了 `onScroll`
但不是`scrollEventThrottle` - ssh - Spinnaker 使用 ssh-tunnel 从 localhost 访问 UI
- python - 我如何每 30 秒重复一次推送通知
- sql - 如果值小于则按 ID 计数并将值设置为 1
- java - Is it a good practice to make one Spring bean inherit another one?
- c# - 在 C# 控制台应用程序中的 System.Net.Http 中启用 cookie?
- json - 将多种类型的数据写入列表以供多个线程访问
- kernel - iOS 和 Mac 上的 Metal 内核有什么不同吗?
- matlab - 3D 模型的 2D 投影
- javascript - JavaScript:window.top 和 top.window 有什么区别