首页 > 解决方案 > spark coalesce(20) 覆盖 repartition(1000).groupby(xxx).apply(func) 的并行性

问题描述

注意:这不是问coalesce和repartition之间的区别的问题,有很多问题谈论这个,我的不同。

我有一份 pysaprk 工作

df = spark.read.parquet(input_path)

@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
    ...
    return pdf

df = df.repartition(1000, 'store_id', 'product_id')
df1 = df.groupby(['store_id', 'product_id']).apply(train_predict)

df1 = df1.withColumnRenamed('y', 'yhat')

print('Partition number: %s' % df.rdd.getNumPartitions())

df1.write.parquet(output_path, mode='overwrite')

默认 200 分区需要大内存,所以我将 repartition 更改为 1000。

spark webui 上的作业详细信息如下所示: 在此处输入图像描述

由于输出只有44M,我尽量coalesce避免使用太多的小文件减慢hdfs。我所做的只是.coalesce(20)在之前添加.write.parquet(output_path, mode='overwrite')

df = spark.read.parquet(input_path)

@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
    ...
    return pdf

df = df.repartition(1000, 'store_id', 'product_id')
df1 = df.groupby(['store_id', 'product_id']).apply(train_predict)

df1 = df1.withColumnRenamed('y', 'yhat')

print('Partition number: %s' % df.rdd.getNumPartitions())  # 1000 here

df1.coalesce(20).write.parquet(output_path, mode='overwrite')

然后 spark webui 显示:

在此处输入图像描述

看起来只有 20 个任务正在运行。

当 repartion(1000) 时,并行度取决于我的 vcores 数,这里是 36。我可以直观地跟踪进度(进度条大小为 1000 )。在 coalesce(20) 之后,之前的 repartion(1000) 失去了功能,并行度下降到 20 ,也失去了直觉。添加coalesce(20)会导致整个工作卡住并在没有通知的情况下失败。

改变coalesce(20)工作repartition(20),但根据文件,coalesce(20)效率更高,不应该引起这样的问题。

我想要更高的并行度,并且只有结果合并为 20 。正确的方法是什么?

标签: apache-sparkpyspark

解决方案


coalesceSpark 优化器将其视为一个狭窄的转换,因此它将创建一个从您的 groupby 到输出的 WholeStageCodegen 阶段,从而将您的并行度限制为 20。

repartition是一个广泛的转换(即强制洗牌),当您使用它而不是coalesce添加新的输出阶段但保留 groupby-train 并行性时。

repartition(20)在您的用例中是一个非常合理的选择(洗牌很小,因此成本很低)。

另一种选择是明确阻止 Spark 优化器合并您的预测和输出阶段,例如通过使用cachepersist在您的合并之前:

# Your groupby code here

from pyspark.storagelevel import StorageLevel

df1.persist(StorageLevel.MEMORY_ONLY)\
   .coalesce(20)\
   .write.parquet(output_path, mode='overwrite')

鉴于您的输出大小较小, MEMORY_ONLY persist + coalesce 应该比重新分区更快,但是当输出大小增加时,这并不成立


推荐阅读