首页 > 解决方案 > 当条件运行时间太长时,pyspark multiple

问题描述

我需要根据两种不同条件的组合向我的 DF 添加新列,其中一个是相同的,另一个是不同的:

如果操作是“同步”并且消息包含“已删除” - 然后删除

如果操作是“同步”并且消息包含“更新” - 然后更新

如果操作是“同步”并且消息包含“已创建” - 然后创建

import pyspark.sql.functions as f

operation = "SYNC"
deleted = "was deleted: true"
created = "was created: true"
updated = "was updated: true"
df1 = df.withColumn(
    "synced",
    f.when(
        f.col("operation").contains(operation) & f.col("message").contains(deleted),
        f.lit("deleted"),
    )
    .when(
        f.col("operation").contains(operation) & f.col("message").contains(created),
        f.lit("created"),
    )
    .when(
        f.col("operation").contains(operation) & f.col("message").contains(updated),
        f.lit("updated"),
    ),
)

此代码工作正常,这意味着它可以完成工作。但我的问题是它运行时间太长:对于大约 1.68 亿行的 DF,它运行大约 20 分钟。

我在这个 DF 中多次搜索“消息”字段以查找不同的字符串,通常运行不到一分钟,但对于这种特定情况,运行时间太长了。

为了减少此脚本的运行时间,我可以做些什么不同的事情?

标签: pythonpysparkcase

解决方案


编辑我的答案以匹配您的评论

找出瓶颈可能有点费时。您的代码中有几个转换,在每个转换之后,添加一个动作,例如显示或收集并测量时间。您将能够找到添加最多的转换。当然,最后一个动作会比第一个动作花费更多的时间,因为它是你所有变换的总和,但每次变换后比较一下,你就能知道某一刻是否有巨大的差距。

另一种分析方法是阅读解释计划。你做了所有的转变,你做df.explain()了并试图了解你的瓶颈在哪里。您也可以将其添加到您的问题中,有人可能会看到问题来自哪里。


推荐阅读