首页 > 解决方案 > spark中的性能调优

问题描述

我正在运行一个处理大约 2 TB 数据的 spark 作业。处理包括:

  1. 读取数据(avrò 文件)
  2. 在属于地图类型的列上展开
  3. OrderBy分解列中的键
  4. 过滤DataFrame(我有一个非常小的(7)组键(称为键集),我想过滤df)。我做一个df.filter(col("key").isin(keyset: _*) )
  5. 我将此 df 写入镶木地板(此数据框非常小)
  6. 然后我再次过滤原始数据帧以获取不在密钥集中的所有密钥 df.filter(!col("key").isin(keyset: _*) )并将其写入镶木地板。这是更大的数据集。

原始的 avro 数据约为 2TB。处理大约需要 1 小时。我想优化它。我在第 3 步之后缓存数据帧,使用 6000 的随机分区大小。min executors = 1000,max = 2000,executor memory = 20 G,executor core = 2。还有其他优化建议吗?左连接会比过滤器性能更好吗?

标签: scalaperformanceapache-spark

解决方案


在我看来一切都是正确的。如果你有小数据集,那isin没关系。

1)确保您可以增加核心数量。执行器核心=5

不建议每个执行器使用超过 5 个核心。这是基于一项研究,其中任何具有超过 5 个并发线程的应用程序都会开始影响性能。

2)确保您有良好/统一的分区结构。

示例(仅用于调试目的,不用于生产):

  import org.apache.spark.sql.functions.spark_partition_id
  yourcacheddataframe.groupBy(spark_partition_id).count.show()

这将打印 spark 分区号以及每个分区中存在多少条记录。基于此,您可以重新分区,如果您不想要更多并行性。

3)spark.dynamicAllocation.enabled可能是另一种选择。

例如 :

spark-submit --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.cachedExecutorIdleTimeout=100 --conf spark.shuffle.service.enabled=true

连同所有其他必需的道具……那是那项工作。如果您在 spark-default.conf 中提供这些道具,它将适用于所有工作。

使用所有这些上述选项,您的处理时间可能会降低。


推荐阅读