首页 > 解决方案 > 当左侧数据较大(~1B 记录)且具有幂律且右侧数据较小(但 > 2GB)时,我是否应该启用 shufflehashjoin

问题描述

我有一个非常大的数据集,有 3.5 亿到 10 亿条记录,具体取决于批次。在右侧,我有一个小得多的数据集,通常大小为 1000 万左右,而不是更多。我不能简单地广播右侧(有时它会超过 8BG,这是一个硬限制)。最重要的是,我的左侧在连接键上有幂律分布。

我尝试通过添加随机盐来随机爆炸右侧键的技巧,以对抗左侧的幂律分布。

这可以按预期工作,但是对于偶尔的批处理,我会遇到内存超出限制的容器故障(19GB 中的 19.5GB)。每个执行程序我只能达到 17GB + 2GB 开销。我尝试减少内核以使每个线程拥有更多内存,但仍然会发生同样的问题。该问题每 50 批左右发生 2 或 3 次。当作业从故障点重新启动时,相同的批次会正确运行。

连接的右侧是通过广播连接将小数据连接到中等大小的数据而产生的,连接的较大一侧是检查点,以便在发生错误时节省时间。

val x = larger.join(broadcast(smaller), Seq("col1", "col2", ...), "left")

通过将非常大的数据连接到 x 来获得结果。

val res = very_large.join(x, Seq("col2", "col3", ....), "left_outter").where(condition)

我的问题是,在这种情况下,重新启用(默认禁用)shuffle hash join 是否是一个更好的选择。我的理解是,鉴于我的右侧比连接的左侧小得多,因此随机播放连接可能是比排序合并连接(默认启用)更好的选择。

我使用 spark 2.3(由于平台限制无法升级)。我确实有一些自定义催化剂表达式,但它们已经过测试,并且不会在其他工作中崩溃。// 我列出这个只是为了同谋。

注意:由于 IP,我无法粘贴代码示例。

标签: apache-spark-2.3

解决方案


推荐阅读