apache-spark - spark coalesce(20) 覆盖 repartition(1000).groupby(xxx).apply(func) 的并行性
问题描述
注意:这不是问coalesce和repartition之间的区别的问题,有很多问题谈论这个,我的不同。
我有一份 pysaprk 工作
df = spark.read.parquet(input_path)
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
...
return pdf
df = df.repartition(1000, 'store_id', 'product_id')
df1 = df.groupby(['store_id', 'product_id']).apply(train_predict)
df1 = df1.withColumnRenamed('y', 'yhat')
print('Partition number: %s' % df.rdd.getNumPartitions())
df1.write.parquet(output_path, mode='overwrite')
默认 200 分区需要大内存,所以我将 repartition 更改为 1000。
由于输出只有44M,我尽量coalesce
避免使用太多的小文件减慢hdfs。我所做的只是.coalesce(20)
在之前添加.write.parquet(output_path, mode='overwrite')
:
df = spark.read.parquet(input_path)
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
...
return pdf
df = df.repartition(1000, 'store_id', 'product_id')
df1 = df.groupby(['store_id', 'product_id']).apply(train_predict)
df1 = df1.withColumnRenamed('y', 'yhat')
print('Partition number: %s' % df.rdd.getNumPartitions()) # 1000 here
df1.coalesce(20).write.parquet(output_path, mode='overwrite')
然后 spark webui 显示:
看起来只有 20 个任务正在运行。
当 repartion(1000) 时,并行度取决于我的 vcores 数,这里是 36。我可以直观地跟踪进度(进度条大小为 1000 )。在 coalesce(20) 之后,之前的 repartion(1000) 失去了功能,并行度下降到 20 ,也失去了直觉。添加coalesce(20)
会导致整个工作卡住并在没有通知的情况下失败。
改变coalesce(20)
工作repartition(20)
,但根据文件,coalesce(20)
效率更高,不应该引起这样的问题。
我想要更高的并行度,并且只有结果合并为 20 。正确的方法是什么?
解决方案
coalesce
Spark 优化器将其视为一个狭窄的转换,因此它将创建一个从您的 groupby 到输出的 WholeStageCodegen 阶段,从而将您的并行度限制为 20。
repartition
是一个广泛的转换(即强制洗牌),当您使用它而不是coalesce
添加新的输出阶段但保留 groupby-train 并行性时。
repartition(20)
在您的用例中是一个非常合理的选择(洗牌很小,因此成本很低)。
另一种选择是明确阻止 Spark 优化器合并您的预测和输出阶段,例如通过使用cache
或persist
在您的合并之前:
# Your groupby code here
from pyspark.storagelevel import StorageLevel
df1.persist(StorageLevel.MEMORY_ONLY)\
.coalesce(20)\
.write.parquet(output_path, mode='overwrite')
鉴于您的输出大小较小, MEMORY_ONLY persist + coalesce 应该比重新分区更快,但是当输出大小增加时,这并不成立
推荐阅读
- wordpress - 从 woocommerce 中的变量(客户余额)添加购物车费用
- javascript - React.memo:使用自定义函数比较 React 组件 props 的灵活性和效率
- javascript - message.guild.createRole 不起作用 - discord.js
- angular - 如何在 RadListView 中使用 ScollView
- python - 无法使用 boto3 列出存储桶中的所有对象,但可以使用 AWS CLI
- java - 我想使用 Java 在 Android 中使用 Cloud Firestore 从最新到最旧对数据进行排序
- reactjs - 如何在 index.js 中配置 react-router-dom、react-router-redux?
- ios - 如何将我的 NSString 分隔为两个不同的数组,即 Objective-C 中的键和值?
- r - 如何在 VS 代码中使用带有 R 内核的 jupyter notebook?
- macos - 在高塞拉上找不到冲泡命令