apache-spark-sql - 在 hdfs 中写入小文件或使用 coalesce
问题描述
我正在使用 sql spark 在 hdfs 中读取和写入文件,使用以下代码:
val df = spark.read.parquet("D:/resources/input/address/year_month_day=2018-07-02")
val df.write.mode("overwrite").parquet("D:/resources/output/t_kcdo_person")
写入的结果是很多小 文件 。根据我所了解的,不建议文件的大小小于 128 MB。我一直在寻找减少文件但尺寸更大的方法,我找到了函数 df.coalesce,但我有一个问题,是否建议使用此函数,因为它会避免并行性。
解决方案
这是 Spark 中的一个已知问题。无论数据的实际大小如何,每个分区都会输出一个文件。coalesce
也不是灵丹妙药 - 您需要非常小心新的分区数量 - 太小并且应用程序将 OOM。您可能会考虑在运行时计算合并值,但在大多数情况下,这意味着将数据保存到磁盘,获取实际大小,然后再次读取并coalesce
达到最佳大小。
在您的简单示例中,您可以事先获得实际的输入大小。但是对于一般情况,有像FileCrush这样的工具可以对您的输出(小文件)进行操作并将它们合并到更少的文件中。但是它现在很旧并且基于map-reduce(尽管仍然有效)。在我的团队中,我们构建了一个非常简单的 Spark 版本,您可以轻松创建自己的版本。如果这样做,请记住在计算最佳分区数时考虑压缩编解码器。
此外,您在使用coalesce
. 如果这成为一个问题,并且您有一些计算应该在 之前运行在更高级别的并行度上coalesce
,您可以使用类似spark.createDataFrame(df.rdd, df.schema)
创建一个新数据帧的方法来避免将coalesce
下推得太低。但是,这具有您需要考虑的重要含义。
推荐阅读
- excel - Excel - 使用多个标准(索引/匹配)在相邻的列中显示多个结果?
- javascript - 在“react-pdf”中加载所有 pdf 页面时重新渲染问题
- django - Django TypeError:得到了一个意外的关键字参数'model_name'
- c++ - 澄清 C 和 C++ 中结构的 ODR 规则的差异
- php - PHP连续减法两个数组
- kotlin - 这个问题是否与桥接设计模式匹配
- c++ - C ++:使用适当的序列号创建向量
- finance - 从接收tick数据到生成1分钟K线的实时处理
- android - 如何在android项目的线性布局上添加相邻的文本视图和编辑框(带有材料设计的编辑框)?
- python - 使用导入脚本的关系路径在导入脚本中导入数据 (Python)