scala - spark中的性能调优
问题描述
我正在运行一个处理大约 2 TB 数据的 spark 作业。处理包括:
- 读取数据(avrò 文件)
- 在属于地图类型的列上展开
OrderBy
分解列中的键- 过滤DataFrame(我有一个非常小的(7)组键(称为键集),我想过滤df)。我做一个
df.filter(col("key").isin(keyset: _*) )
- 我将此 df 写入镶木地板(此数据框非常小)
- 然后我再次过滤原始数据帧以获取不在密钥集中的所有密钥
df.filter(!col("key").isin(keyset: _*) )
并将其写入镶木地板。这是更大的数据集。
原始的 avro 数据约为 2TB。处理大约需要 1 小时。我想优化它。我在第 3 步之后缓存数据帧,使用 6000 的随机分区大小。min executors = 1000,max = 2000,executor memory = 20 G,executor core = 2。还有其他优化建议吗?左连接会比过滤器性能更好吗?
解决方案
在我看来一切都是正确的。如果你有小数据集,那isin
没关系。
1)确保您可以增加核心数量。执行器核心=5
不建议每个执行器使用超过 5 个核心。这是基于一项研究,其中任何具有超过 5 个并发线程的应用程序都会开始影响性能。
2)确保您有良好/统一的分区结构。
示例(仅用于调试目的,不用于生产):
import org.apache.spark.sql.functions.spark_partition_id
yourcacheddataframe.groupBy(spark_partition_id).count.show()
这将打印 spark 分区号以及每个分区中存在多少条记录。基于此,您可以重新分区,如果您不想要更多并行性。
3)spark.dynamicAllocation.enabled
可能是另一种选择。
例如 :
spark-submit --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.cachedExecutorIdleTimeout=100 --conf spark.shuffle.service.enabled=true
连同所有其他必需的道具……那是那项工作。如果您在 spark-default.conf 中提供这些道具,它将适用于所有工作。
使用所有这些上述选项,您的处理时间可能会降低。
推荐阅读
- ios - 如何检查响应编码值是否为空?
- ionic-framework - Ionic 4 协议缓冲区
- python - 用 Python 更新 MySQL 列
- spring-boot - 如何在springboot的h2数据库中自动生成一个id?
- python-3.x - 将日期与 datetime64[ns] 进行比较 - Pandas
- python - CLibraryError: 解析单元数据库时出错
- python - "flatten: 每一行 pandas Dataframe
- structure - String 作为 Union 的成员
- android - RecyclerView / DiffUtils 动画当数据集更改而没有完全刷新
- android - 如何获取 .keystore 文件以生成加密您的私钥