首页 > 解决方案 > PySpark - 在镶木地板读取后优化分区数

问题描述

year在一个由和划分的镶木地板数据湖month中,spark.default.parallelism设置为 ie 4,假设我想创建一个 DataFrame ,该数据帧由 2017 年的第 11~12 个月和 2018 年的第 1~3 个月的两个来源AB.

df = spark.read.parquet(
    "A.parquet/_YEAR={2017}/_MONTH={11,12}",
    "A.parquet/_YEAR={2018}/_MONTH={1,2,3}",
    "B.parquet/_YEAR={2017}/_MONTH={11,12}",
    "B.parquet/_YEAR={2018}/_MONTH={1,2,3}",
)

如果我得到分区数,spark.default.parallelism则默认使用 Spark:

df.rdd.getNumPartitions()
Out[4]: 4

考虑到创建后df我需要在每个时期执行joingroupBy操作,并且数据或多或少均匀分布在每个时期(每个时期大约 1000 万行):

问题

标签: apache-sparkpysparkpartitioningparquet

解决方案


重新分区会提高我后续操作的性能吗?

通常不会。Dataset抢先对数据进行重新分区的唯一原因是,当基于相同的条件将相同的数据用于多个连接时,避免进一步洗牌

如果是这样,如果我有 10 个不同的时期(A 和 B 每年 5 个),我是否应该按时期数重新分区并明确引用要重新分区的列 (df.repartition(10,'_MONTH','_YEAR') )?

让我们一步一步来:

  • 我应该按周期数重新分区吗

    从业者不保证级别和分区之间的 1:1 关系,所以唯一要记住的是,您不能拥有比唯一键更多的非空分区,因此使用显着更大的值没有意义。

  • 并明确引用要重新分区的列

    如果您repartition随后joingroupBy对这两个部分使用相同的列集是唯一明智的解决方案。

概括

repartitoningbefore join 在两种情况下有意义:

  • 如果有多个后续joins

    df_ = df.repartition(10, "foo", "bar")
    df_.join(df1, ["foo", "bar"])
    ...
    df_.join(df2, ["foo", "bar"])
    
  • 当所需的输出分区数量不同时使用单连接spark.sql.shuffle.partitions(并且没有广播连接)

    spark.conf.get("spark.sql.shuffle.partitions")
    # 200
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
    
    df1_ = df1.repartition(11, "foo", "bar")
    df2_ = df2.repartition(11, "foo", "bar")
    
    df1_.join(df2_, ["foo", "bar"]).rdd.getNumPartitions()
    # 11
    
    df1.join(df2, ["foo", "bar"]).rdd.getNumPartitions()
    # 200
    

    这可能比:

    spark.conf.set("spark.sql.shuffle.partitions", 11)
    df1.join(df2, ["foo", "bar"]).rdd.getNumPartitions()
    spark.conf.set("spark.sql.shuffle.partitions", 200)
    

推荐阅读