python - Spark:通过 UDF 分发少量计算密集型任务
问题描述
我有一个带有 5 个可用于计算的工作节点的 spark 集群(在 Azure Databricks 中)。但是,我需要解决的任务与典型的 spark 用例不同:我必须对 60 行数据运行非常复杂的操作,而不是需要应用到数百万行的简单任务。
我的意图是基本上将60个任务分配给5个worker,让每个worker处理60/5 = 12
任务。为此,我知道执行者的数量应该等于工人的数量。这似乎是这种情况,如运行所示
num_executors = len(spark.sparkContext._jsc.sc().statusTracker().getExecutorInfos()) - 1
# returns 5
这是一些运行的简单伪代码,但仅在单个工作人员上运行:
def my_complex_function(input):
# this function uses all available cores (internally parallelized)
# and takes about 15 minutes to complete per call if run on a
# single worker node
do_stuff(input_row)
write_output_to_file(stuff)
return(debug_message)
UDF_function = udf(lambda z: my_complex_function(input_row), StringType())
sdf = spark.createDataFrame(data=data,schema=["data"])
# sdf contains 60 rows and a single column, "data".
# "data" is just a path to blob storage file that needs to be processed.
sdf_new = sdf.withColumn("output", UDF_function(col("data")))
display(sdf_new) # <- Triggers the computation
如前所述,这似乎只在单个工作人员上运行。我认为这是因为我的数据集非常小,它没有分发给不同的工作人员 - 我试图通过以下方式解决这个问题:
sdf = sdf.repartition(num_executors)
但是,这仍然不起作用。正如 Spark UI 和我的日志文件所示,只使用了一个工作人员。
我需要设置什么才能让每个执行者并行运行他们的任务份额?
解决方案
该display
函数推测调度尽可能少的任务,以产生其上限为 1000 行的输出。它首先安排一项任务,并希望这足够了。然后是 4、20... 等等。在您的情况下,这需要很长时间。
您可以尝试在驱动程序处收集所有内容:
sdf_new.collect()
通过在驱动程序中收集所有内容,您肯定会触发对数据帧的完整评估。
推荐阅读
- flutter - 在全局 const 函数中使用时 Sizer 包错误
- jquery - ExpressJS - 如何使用 Auth RestAPI 收到的 JWT
- javascript - 无法在 Chrome 中加载用 HTML 和 JavaScript 编写的本地页面进行测试
- php - 如何验证字符串是否包含有效的 MySQL VARCHAR 数据类型?
- typescript - 打字稿中的破坏对象
- c# - 按搜索字符串中的每个单词搜索名称
- python - 使用 aiomysql 执行多个 SQL 查询
- sas - SAS 组合 PROC MEANS 的类值范围
- javafx - 选择仅适用于 TextFlow 中的第一个 Text 节点
- flutter - 设置所选文本的颜色