parallel-processing - 将 spark 数据拆分为分区并将这些分区并行写入磁盘
问题描述
问题大纲:假设我在 AWS 的 EMR 集群上使用 spark 处理了 300+ GB 的数据。该数据具有三个属性,用于在文件系统上进行分区以在 Hive 中使用:日期、小时和(比方说)anotherAttr。我想以最小化写入文件数量的方式将此数据写入 fs。
我现在正在做的是获取日期、小时、anotherAttr 的不同组合以及组成组合的行数。我将它们收集到驱动程序上的一个列表中,并遍历该列表,为每个组合构建一个新的 DataFrame,使用行数重新分区该 DataFrame 以推测文件大小,并使用 DataFrameWriter 将文件写入磁盘,.orc
完成它。
出于组织原因,我们没有使用 Parquet。
这种方法效果不错,解决了下游团队使用 Hive 而不是 Spark 看不到大量文件导致的性能问题的问题。例如,如果我使用整个 300 GB 数据帧,对 1000 个分区(在 spark 中)和相关列进行重新分区,然后将其转储到磁盘,所有这些都并行转储,并在大约 9 分钟内完成整个事情。但这会为较大的分区提供多达 1000 个文件,这会破坏 Hive 的性能。或者它会破坏某种性能,老实说不是 100% 确定是什么。我刚刚被要求尽量减少文件数量。使用我正在使用的方法,我可以将文件保持在我想要的任何大小(无论如何都相对接近),但是没有并行性,运行大约需要 45 分钟,主要是等待文件写入。
在我看来,由于某些源行和某些目标行之间存在一对一的关系,并且由于我可以将数据组织到不重叠的“文件夹”(Hive 的分区)中,所以我应该能够组织我的代码/数据帧以这样一种方式,我可以要求 spark 并行编写所有目标文件。有人对如何攻击这个有建议吗?
我测试过的东西不起作用:
使用 Scala 并行集合开始写入。无论 spark 对 DataFrame 做什么,它都没有很好地分离任务,并且一些机器遇到了大量的垃圾收集问题。
DataFrame.map - 我试图映射唯一组合的 DataFrame,并从那里开始写入,但无法从其中访问我实际需要的数据
map
的 DataFrame - DataFrame 引用在执行程序上为空。DataFrame.mapPartitions - 一个非入门者,无法从 mapPartitions 内部提出任何想法来做我想做的事情
“分区”这个词在这里也不是特别有用,因为它既指 spark 按某些标准拆分数据的概念,也指 Hive 在磁盘上组织数据的方式。我想我在上面的用法中很清楚。因此,如果我想为这个问题提供一个完美的解决方案,那就是我可以创建一个基于三个属性的具有 1000 个分区的 DataFrame 以进行快速查询,然后从中创建另一个 DataFrame 集合,每个 DataFrame 都具有一个独特的组合这些属性,重新分区(在 spark 中,但对于 Hive),分区数与其包含的数据大小相适应。大多数 DataFrame 将有 1 个分区,少数将有多达 10 个。文件应该是 ~3 GB,我们的 EMR 集群的 RAM 比每个执行程序的 RAM 多,
一旦创建了 DataFrame 列表并且每个都重新分区,我可以要求 spark 将它们全部并行写入磁盘。
像这样的东西在火花中可能吗?
我在概念上不清楚的一件事:说我有
val x = spark.sql("select * from source")
和
val y = x.where(s"date=$date and hour=$hour and anotherAttr=$anotherAttr")
和
val z = x.where(s"date=$date and hour=$hour and anotherAttr=$anotherAttr2")
与 DataFrame 有多大y
不同z
?如果我重新分区,y
洗牌有什么影响?z
x
解决方案
这个说法:
我将它们收集到驱动程序上的一个列表中,并遍历列表,为每个组合构建一个新的 DataFrame,使用行数重新分区该 DataFrame 以推测文件大小,并使用 DataFrameWriter 将文件写入磁盘,.orc 完成它离开。
就 Spark 而言,它完全偏离光束。收集到驱动程序从来都不是一个好方法,体积和 OOM 问题以及您的方法中的延迟很高。
使用以下命令可以简化并获得 Spark 的并行性,从而为您的老板节省时间和金钱:
df.repartition(cols...)...write.partitionBy(cols...)...
洗牌是通过发生repartition
的,从来没有洗牌过partitionBy
。
就这么简单,使用 Spark 的默认并行性。
推荐阅读
- excel - 从 Excel 中的表进行条件复制
- xml - saxon 将目录作为命令行参数传递给 XSLT 2.0 样式表
- ruby - 如何将两个 XML 文件与 Nokogiri 合并
- go - 从函数约定 Golang 返回地址
- ios - 如何修复我的代码中的 AVAudioPlayer 问题?
- boost - 无法链接到 cmake 导入的目标
- excel - 设置多选项字段的linkedcell
- pine-script - 松脚本检查未来吧
- javascript - 使用javascript删除url参数
- python-3.x - pySpark 问题:map/parallelize 未按预期工作