首页 > 解决方案 > Spark:通过 UDF 分发少量计算密集型任务

问题描述

我有一个带有 5 个可用于计算的工作节点的 spark 集群(在 Azure Databricks 中)。但是,我需要解决的任务与典型的 spark 用例不同:我必须对 60 行数据运行非常复杂的操作,而不是需要应用到数百万行的简单任务。

我的意图是基本上将60个任务分配给5个worker,让每个worker处理60/5 = 12任务。为此,我知道执行者的数量应该等于工人的数量。这似乎是这种情况,如运行所示

num_executors = len(spark.sparkContext._jsc.sc().statusTracker().getExecutorInfos()) - 1
# returns 5

这是一些运行的简单伪代码,但仅在单个工作人员上运行:

def my_complex_function(input):
  # this function uses all available cores (internally parallelized)
  # and takes about 15 minutes to complete per call if run on a 
  # single worker node
  do_stuff(input_row)
  write_output_to_file(stuff)
  return(debug_message)

UDF_function = udf(lambda z: my_complex_function(input_row), StringType())
sdf = spark.createDataFrame(data=data,schema=["data"])

# sdf contains 60 rows and a single column, "data".
# "data" is just a path to blob storage file that needs to be processed.

sdf_new = sdf.withColumn("output", UDF_function(col("data")))
display(sdf_new) # <- Triggers the computation

如前所述,这似乎只在单个工作人员上运行。我认为这是因为我的数据集非常小,它没有分发给不同的工作人员 - 我试图通过以下方式解决这个问题:

sdf = sdf.repartition(num_executors)

但是,这仍然不起作用。正如 Spark UI 和我的日志文件所示,只使用了一个工作人员。

我需要设置什么才能让每个执行者并行运行他们的任务份额?

标签: pythonapache-sparkpysparkdatabricksazure-databricks

解决方案


display函数推测调度尽可能少的任务,以产生其上限为 1000 行的输出。它首先安排一项任务,并希望这足够了。然后是 4、20... 等等。在您的情况下,这需要很长时间。

您可以尝试在驱动程序处收集所有内容:

sdf_new.collect()

通过在驱动程序中收集所有内容,您肯定会触发对数据帧的完整评估。


推荐阅读