scala - Spark Dataframe Join shuffle
问题描述
星火版本 1.6.0
我在两个具有 100 个分区的数据帧之间使用连接函数,该应用程序在一个集群上运行,我为每个 20 个执行程序使用 5 个内核,总共 100 个内核。
我的问题是,当我进行连接时,所有记录都在一个执行器上计算,而其他执行器不使用,如下图所示:
这会导致性能下降,因为所有数据都是使用一个执行程序针对其他 19 个可用执行程序计算的。
看起来 spark join 只在一个分区中“带来”所有记录,有没有办法避免这种情况?
为了确保它不会重新分配到 1,我还设置了这个 spark 属性:spark.sql.shuffle.partitions=100
确实,两个输入数据帧有 100 个与输出数据帧相同的分区
解决方案
简短的回答:
这是因为您的数据,而不是因为火花。
长答案:
为了执行join
操作,火花需要将具有相同键(您要加入的列的值)的数据移动到相同的工作人员。例如,如果您将 A 列与 B 列连接,则两个表中包含相同值的行将被移动到相同的工作人员,然后再连接。
此外 - 具有不同键的行也可能移动到同一个节点 - 这取决于您拥有的分区器。您可以在此处阅读更多信息- 但一般认为默认分区器存在 -HashPartitioner
和RangePartitioner
. 尽管使用了哪一个 - 它决定了哪个工人行。例如 - 如果您有 RangePartitioner 范围为 [0, 5)[5. 7)[7, 10] 然后键 1, 2, 3, 4 将全部交给同一个工人。如果您的数据中只有这些键 - 将只使用一名工作人员。
推荐阅读
- vb.net - 尝试在 VS 2013 中打开但在 VS 2008 和 2005 中工作正常时,Crystal Report 错误缺少参数
- here-api - geocoder api 结果如何排序?
- dart - 我无法添加 Text(installedApps[index]["app_name"]) 因为 'index' 没有定义。如何添加此文本?
- javascript - 如何在Angular 2中过滤具有多个复选框的范围滑块的结果?
- mongodb - 如何使用相对于纪元时间的时间跨度过滤 mongo db 文档?
- python-3.x - 生成具有一些保证存在的半随机数集
- python - 基本 Python 问题:向后切片 (-1) 的行为因字符串长度而异。为什么?
- opencv - 我的 CBIR 系统结果如此糟糕是否正常?
- java - 这种方法标准是否使用 varags.length 而不是布尔值?
- android - 我应该将 BroadcastReceiver 或 Service 与警报管理器一起使用吗?