首页 > 解决方案 > UDF 中的 Pyspark 和局部变量

问题描述

当我定义一个局部变量(例如一个庞大的复杂对象列表)并在 pyspark 的 UDF 中使用它时,究竟会发生什么。让我以此为例:

huge_list = [<object_1>, <object_2>, ..., <object_n>]

@udf
def some_function(a, b):
    l = []
    for obj in huge_list:
        l.append(a.operation(obj))
    return l

df2 = df.withColumn('foo', some_function(col('a'), col('b')))

是自动播的吗?还是节点每次都与主节点通信以获取其数据?这种方法对我有什么性能惩罚?有更好的吗?huge_list(考虑到每次应用 UDF 时从头开始构建会更糟糕)

标签: pythonapache-sparkpysparkuser-defined-functions

解决方案


查看代码,可以看到以下情况:每个 udf调用一次此函数,函数通过CloudPickleSerializer. 它还具有将腌制可调用文件的大小与硬编码阈值1Mb 进行比较的逻辑。如果大小较大,则广播腌制命令并腌制一个类型的对象pyspark.broadcast.Broadcast(其序列化值显然非常短,因为该对象几乎是一个引用)。读取腌制可调用的地方似乎在这里. 我的理解是,执行者从头开始为每个新任务执行创建一个 python 进程。对于每个使用的 udf ,它将获得腌制命令并将其取消腌制,或者(对于广播)将需要从 JVM 获取广播的值并将其取消腌制。

据我了解,如果pyspark.broadcast.Broadcast在此处创建对象,则所有执行程序都将为该执行程序将创建的 python worker.py 进程的所有未来查找保留其值。

因此,如果您想回答某个函数是否会被广播的问题,您可以重复 pyspark 所做的相同操作并查看腌制对象是否大于 1Mb,例如:

from pyspark.serializers import CloudPickleSerializer
ser = CloudPickleSerializer()
x = [i**2 for i in range(10**5)]
v = ser.dumps(lambda : x)
print(len(v)) # 607434 - less than 1Mb, won't be broadcast

关于替代方法,我认为我看到的唯一替代方法(除了每次调用 udf'ed 函数时创建新对象,这已经被解释为过于昂贵)将创建一个模块,该模块将在导入期间创建相关对象。在这种情况下,将为每个任务执行创建一次对象。因此,这几乎可以让您选择(a)每次任务执行反序列化一次对象,CloudPickleSerializer如果您只允许 udf 函数捕获它,或者(b)通过导入模块在每次任务执行时创建一次对象。更快的是一个单独的问题 - 但我想答案可能取决于所讨论的对象。在每种情况下,它似乎都相当容易衡量。


推荐阅读