首页 > 解决方案 > 将 spark 数据拆分为分区并将这些分区并行写入磁盘

问题描述

问题大纲:假设我在 AWS 的 EMR 集群上使用 spark 处理了 300+ GB 的数据。该数据具有三个属性,用于在文件系统上进行分区以在 Hive 中使用:日期、小时和(比方说)anotherAttr。我想以最小化写入文件数量的方式将此数据写入 fs。

我现在正在做的是获取日期、小时、anotherAttr 的不同组合以及组成组合的行数。我将它们收集到驱动程序上的一个列表中,并遍历该列表,为每个组合构建一个新的 DataFrame,使用行数重新分区该 DataFrame 以推测文件大小,并使用 DataFrameWriter 将文件写入磁盘,.orc完成它。

出于组织原因,我们没有使用 Parquet。

这种方法效果不错,解决了下游团队使用 Hive 而不是 Spark 看不到大量文件导致的性能问题的问题。例如,如果我使用整个 300 GB 数据帧,对 1000 个分区(在 spark 中)和相关列进行重新分区,然后将其转储到磁盘,所有这些都并行转储,并在大约 9 分钟内完成整个事情。但这会为较大的分区提供多达 1000 个文件,这会破坏 Hive 的性能。或者它会破坏某种性能,老实说不是 100% 确定是什么。我刚刚被要求尽量减少文件数量。使用我正在使用的方法,我可以将文件保持在我想要的任何大小(无论如何都相对接近),但是没有并行性,运行大约需要 45 分钟,主要是等待文件写入。

在我看来,由于某些源行和某些目标行之间存在一对一的关系,并且由于我可以将数据组织到不重叠的“文件夹”(Hive 的分区)中,所以我应该能够组织我的代码/数据帧以这样一种方式,我可以要求 spark 并行编写所有目标文件。有人对如何攻击这个有建议吗?

我测试过的东西不起作用:

  1. 使用 Scala 并行集合开始写入。无论 spark 对 DataFrame 做什么,它都没有很好地分离任务,并且一些机器遇到了大量的垃圾收集问题。

  2. DataFrame.map - 我试图映射唯一组合的 DataFrame,并从那里开始写入,但无法从其中访问我实际需要的数据map的 DataFrame - DataFrame 引用在执行程序上为空。

  3. DataFrame.mapPartitions - 一个非入门者,无法从 mapPartitions 内部提出任何想法来做我想做的事情

“分区”这个词在这里也不是特别有用,因为它既指 spark 按某些标准拆分数据的概念,也指 Hive 在磁盘上组织数据的方式。我想我在上面的用法中很清楚。因此,如果我想为这个问题提供一个完美的解决方案,那就是我可以创建一个基于三个属性的具有 1000 个分区的 DataFrame 以进行快速查询,然后从中创建另一个 DataFrame 集合,每个 DataFrame 都具有一个独特的组合这些属性,重新分区(在 spark 中,但对于 Hive),分区数与其包含的数据大小相适应。大多数 DataFrame 将有 1 个分区,少数将有多达 10 个。文件应该是 ~3 GB,我们的 EMR 集群的 RAM 比每个执行程序的 RAM 多,

一旦创建了 DataFrame 列表并且每个都重新分区,我可以要求 spark 将它们全部并行写入磁盘。

像这样的东西在火花中可能吗?

我在概念上不清楚的一件事:说我有

val x = spark.sql("select * from source")

val y = x.where(s"date=$date and hour=$hour and anotherAttr=$anotherAttr")

val z = x.where(s"date=$date and hour=$hour and anotherAttr=$anotherAttr2")

与 DataFrame 有多大y不同z?如果我重新分区,y洗牌有什么影响?zx

标签: parallel-processingapache-spark-sqlorc

解决方案


这个说法:

我将它们收集到驱动程序上的一个列表中,并遍历列表,为每个组合构建一个新的 DataFrame,使用行数重新分区该 DataFrame 以推测文件大小,并使用 DataFrameWriter 将文件写入磁盘,.orc 完成它离开。

就 Spark 而言,它完全偏离光束。收集到驱动程序从来都不是一个好方法,体积和 OOM 问题以及您的方法中的延迟很高。

使用以下命令可以简化并获得 Spark 的并行性,从而为您的老板节省时间和金钱:

df.repartition(cols...)...write.partitionBy(cols...)...

洗牌是通过发生repartition的,从来没有洗牌过partitionBy

就这么简单,使用 Spark 的默认并行性。


推荐阅读