apache-spark - 在 PySpark groupBy 中,如何按组计算执行时间?
问题描述
我正在将 PySpark 用于一个大学项目,其中我有大型数据框,并且我应用了 PandasUDF,使用groupBy
. 基本上,调用如下所示:
df.groupBy(col).apply(pandasUDF)
我在我的 Spark 配置 ( SparkConf().setMaster('local[10]')
) 中使用了 10 个内核。
目标是能够报告每个组运行我的代码所花费的时间。我想要每组完成的时间,以便我可以取平均值。我也对计算标准偏差感兴趣。
我现在正在使用已清理的数据进行测试,我知道这些数据将分为 10 组,并且我让 UDF 使用time.time()
. 但是,如果我要使用更多组,这将是不可能的(对于上下文,我的所有数据将被分成 3000 个左右的组)。有没有办法测量每组的执行时间?
解决方案
如果不想将执行时间打印到标准输出,您可以将其作为 Pandas UDF 的额外列返回,例如
@pandas_udf("my_col long, execution_time long", PandasUDFType.GROUPED_MAP)
def my_pandas_udf(pdf):
start = datetime.now()
# Some business logic
return pdf.assign(execution_time=datetime.now() - start)
或者,要计算驱动程序应用程序中的平均执行时间,您可以使用两个Accumulators在 UDF 中累积执行时间和 UDF 调用次数。例如
udf_count = sc.accumulator(0)
total_udf_execution_time = sc.accumulator(0)
@pandas_udf("my_col long", PandasUDFType.GROUPED_MAP)
def my_pandas_udf(pdf):
start = datetime.now()
# Some business logic
udf_count.add(1)
total_udf_execution_time.add(datetime.now() - start)
return pdf
# Some Spark action to run business logic
mean_udf_execution_time = total_udf_execution_time.value / udf_count.value
推荐阅读
- python - redis 不遵循 redis.conf 设置
- linux - 检查服务的 ram 使用情况的 linux 命令是什么?
- python - 在使用序列化程序时,反向关系如何在 django 模型中工作?
- java - ArrayBlockingQueue NoSuchElementException
- javascript - 将图像从 json 文件显示为 html
- wpf - 将 IDataErrorInfo 链接到单独类中的现有数据模型类
- redux - Redux 的意思是有一个特定的函数占据应用程序的整个状态?
- javascript - 如何创建“浮动”和“位移”效果?使用反应,CSS3
- javascript - 从锚标记中删除默认 url
- c - (GLib) 如何创建和发出 GSignals?