python - 当条件运行时间太长时,pyspark multiple
问题描述
我需要根据两种不同条件的组合向我的 DF 添加新列,其中一个是相同的,另一个是不同的:
如果操作是“同步”并且消息包含“已删除” - 然后删除
如果操作是“同步”并且消息包含“更新” - 然后更新
如果操作是“同步”并且消息包含“已创建” - 然后创建
import pyspark.sql.functions as f
operation = "SYNC"
deleted = "was deleted: true"
created = "was created: true"
updated = "was updated: true"
df1 = df.withColumn(
"synced",
f.when(
f.col("operation").contains(operation) & f.col("message").contains(deleted),
f.lit("deleted"),
)
.when(
f.col("operation").contains(operation) & f.col("message").contains(created),
f.lit("created"),
)
.when(
f.col("operation").contains(operation) & f.col("message").contains(updated),
f.lit("updated"),
),
)
此代码工作正常,这意味着它可以完成工作。但我的问题是它运行时间太长:对于大约 1.68 亿行的 DF,它运行大约 20 分钟。
我在这个 DF 中多次搜索“消息”字段以查找不同的字符串,通常运行不到一分钟,但对于这种特定情况,运行时间太长了。
为了减少此脚本的运行时间,我可以做些什么不同的事情?
解决方案
编辑我的答案以匹配您的评论
找出瓶颈可能有点费时。您的代码中有几个转换,在每个转换之后,添加一个动作,例如显示或收集并测量时间。您将能够找到添加最多的转换。当然,最后一个动作会比第一个动作花费更多的时间,因为它是你所有变换的总和,但每次变换后比较一下,你就能知道某一刻是否有巨大的差距。
另一种分析方法是阅读解释计划。你做了所有的转变,你做df.explain()
了并试图了解你的瓶颈在哪里。您也可以将其添加到您的问题中,有人可能会看到问题来自哪里。
推荐阅读
- python - 如何保存 GridSearchCV xgboost 模型?
- matlab - 如何使用标题循环遍历 Matlab 表的列?
- bash - 标记字符串 - 错误的结果
- powershell - Powershell / Visual Studio Code: Update version of Powershell in Visual Studio Code
- graphics - 如何在 Spark AR Studio 中的 faceMesh 上创建简单的模糊效果?
- angular - 使用 Mat-Select 过滤 Observable 数组
- reactjs - 将 Button 组件传递给 Bootstrap Tab 组件的 Title 属性
- python - 熊猫重命名列名
- jaxb - 由于 580 次 IllegalAnnotationExceptions 创建 JAXB Marshaller 时出现 jbpm 错误
- firebase - 将公共之外的根文件夹添加到托管 Firebase