首页 > 解决方案 > 为什么 repartition() 方法会增加磁盘上的文件大小?

问题描述

我正在使用的数据湖 ( df) 有 2 TB 的数据和 20,000 个文件。我想将数据集压缩为 2,000 个 1 GB 文件。

如果您运行df.coalesce(2000)并写入磁盘,则数据湖包含 1.9 TB 的数据。

如果您运行df.repartition(2000)并写入磁盘,则数据湖包含 2.6 TB 的数据。

数据湖中的每个文件repartition()都比预期大 0.3 GB(它们都是 1.3 GB 的文件,而不是 1 GB 的文件)。

为什么该repartition()方法会增加整个数据湖的大小?

一个相关问题讨论了为什么运行聚合后数据湖的大小会增加。答案说:

一般来说,像 Parquet 这样的列式存储格式在数据分布(数据组织)和单个列的基数方面非常敏感。数据越有条理,基数越低,存储的效率就越高。

提供数据的coalesce()算法是否更有条理……我不这么认为……

我不认为另一个问题回答了我的问题。

标签: apache-spark

解决方案


免责声明

这个答案主要包含推测。对这种现象的详细解释可能需要对输入和输出(或至少它们各自的元数据)进行深入分析。

观察

  1. 熵有效地限制了最强无损压缩的性能-维基百科 - 熵(信息论)
  2. 持久性列格式内部 Spark SQL 表示都透明地应用不同的压缩技术(如运行长度编码字典编码)来减少存储数据的内存占用。

    此外,磁盘格式(包括纯文本数据)可以使用通用压缩算法显式压缩 - 不清楚这里是否是这种情况。

  3. 压缩(显式或透明)应用于数据块(通常是分区,但可以使用更小的单元)。

  4. 基于 1)、2) 和 3) 我们可以假设平均压缩率将取决于集群中数据的分布。我们还应该注意,如果上游谱系包含广泛的转换,则最终结果可能是不确定的。

coalescevs.的可能影响repartition

一般coalesce可以走两条路:

  • 通过管道升级到源 - 最常见的情况。
  • 传播到最近的洗牌。

在第一种情况下,我们可以预期压缩率将与输入的压缩率相当。然而,在某些情况下可以实现更小的最终输出。让我们想象一个退化的数据集:

val df = sc.parallelize(
  Seq("foo", "foo", "foo", "bar", "bar", "bar"),
  6 
).toDF

如果像这样的数据集被写入磁盘,则没有压缩的可能性 - 每个值都必须按原样写入:

df.withColumn("pid", spark_partition_id).show
+-----+---+
|value|pid|
+-----+---+
|  foo|  0|
|  foo|  1|
|  foo|  2|
|  bar|  3|
|  bar|  4|
|  bar|  5|
+-----+---+

换句话说,我们需要大约 6 * 3 个字节,总共 18 个字节。

但是,如果我们合并

df.coalesce(2).withColumn("pid", spark_partition_id).show
+-----+---+
|value|pid|
+-----+---+
|  foo|  0|
|  foo|  0|
|  foo|  0|
|  bar|  1|
|  bar|  1|
|  bar|  1|
+-----+---+

例如,我们可以应用小 int 作为计数的 RLE,并存储每个分区 3 + 1 个字节,总共 8 个字节。

这当然是一个巨大的过度简化,但显示了保留低熵输入结构和合并块如何导致更低的内存占用。

第二种coalesce情况不太明显,但是在某些情况下,上游过程可以减少熵(例如考虑窗口函数)并且保留这​​种结构将是有益的。

怎么样repartition

repartition不应用分区表达式RoundRobinPartitioningHashPartitioning使用基于分区 id 的伪随机键实现)。只要散列函数的行为合理,这种重新分配就应该使数据的熵最大化,从而降低可能的压缩率。

结论

coalesce不应该单独提供任何特定的好处,但可以保留数据分布的现有属性 - 此属性在某些情况下可能是有利的。

repartition,由于其性质,平均而言会使事情变得更糟,除非数据的熵已经最大化(这种情况可能会有所改善,但在非平凡的数据集上极不可能)。

最后repartition用分区表达式还是repartitionByRange应该降低熵,提高压缩率。

注意

我们还应该记住,列格式通常根据运行时统计信息决定特定的压缩/编码方法(或缺少它)。因此,即使特定块中的行集是固定的,但行的顺序发生了变化,我们也可以观察到不同的结果。


推荐阅读