首页 > 解决方案 > 在 PySpark groupBy 中,如何按组计算执行时间?

问题描述

我正在将 PySpark 用于一个大学项目,其中我有大型数据框,并且我应用了 PandasUDF,使用groupBy. 基本上,调用如下所示:

df.groupBy(col).apply(pandasUDF)

我在我的 Spark 配置 ( SparkConf().setMaster('local[10]')) 中使用了 10 个内核。

目标是能够报告每个组运行我的代码所花费的时间。我想要每组完成的时间,以便我可以取平均值。我也对计算标准偏差感兴趣。

我现在正在使用已清理的数据进行测试,我知道这些数据将分为 10 组,并且我让 UDF 使用time.time(). 但是,如果我要使用更多组,这将是不可能的(对于上下文,我的所有数据将被分成 3000 个左右的组)。有没有办法测量每组的执行时间?

标签: apache-sparkpysparkapache-spark-sqluser-defined-functions

解决方案


如果不想将执行时间打印到标准输出,您可以将其作为 Pandas UDF 的额外列返回,例如

@pandas_udf("my_col long, execution_time long", PandasUDFType.GROUPED_MAP)
def my_pandas_udf(pdf):
    start = datetime.now()
    # Some business logic
    return pdf.assign(execution_time=datetime.now() - start)

或者,要计算驱动程序应用程序中的平均执行时间,您可以使用两个Accumulators在 UDF 中累积执行时间和 UDF 调用次数。例如

udf_count = sc.accumulator(0)
total_udf_execution_time = sc.accumulator(0)

@pandas_udf("my_col long", PandasUDFType.GROUPED_MAP)
def my_pandas_udf(pdf):
    start = datetime.now()
    # Some business logic
    udf_count.add(1)
    total_udf_execution_time.add(datetime.now() - start)
    return pdf

# Some Spark action to run business logic

mean_udf_execution_time = total_udf_execution_time.value / udf_count.value

推荐阅读