python - UDF 中的 Pyspark 和局部变量
问题描述
当我定义一个局部变量(例如一个庞大的复杂对象列表)并在 pyspark 的 UDF 中使用它时,究竟会发生什么。让我以此为例:
huge_list = [<object_1>, <object_2>, ..., <object_n>]
@udf
def some_function(a, b):
l = []
for obj in huge_list:
l.append(a.operation(obj))
return l
df2 = df.withColumn('foo', some_function(col('a'), col('b')))
是自动播的吗?还是节点每次都与主节点通信以获取其数据?这种方法对我有什么性能惩罚?有更好的吗?huge_list
(考虑到每次应用 UDF 时从头开始构建会更糟糕)
解决方案
查看代码,可以看到以下情况:每个 udf调用一次此函数,该函数通过CloudPickleSerializer
. 它还具有将腌制可调用文件的大小与硬编码阈值1Mb 进行比较的逻辑。如果大小较大,则广播腌制命令并腌制一个类型的对象pyspark.broadcast.Broadcast
(其序列化值显然非常短,因为该对象几乎是一个引用)。读取腌制可调用的地方似乎在这里. 我的理解是,执行者从头开始为每个新任务执行创建一个 python 进程。对于每个使用的 udf ,它将获得腌制命令并将其取消腌制,或者(对于广播)将需要从 JVM 获取广播的值并将其取消腌制。
据我了解,如果pyspark.broadcast.Broadcast
在此处创建对象,则所有执行程序都将为该执行程序将创建的 python worker.py 进程的所有未来查找保留其值。
因此,如果您想回答某个函数是否会被广播的问题,您可以重复 pyspark 所做的相同操作并查看腌制对象是否大于 1Mb,例如:
from pyspark.serializers import CloudPickleSerializer
ser = CloudPickleSerializer()
x = [i**2 for i in range(10**5)]
v = ser.dumps(lambda : x)
print(len(v)) # 607434 - less than 1Mb, won't be broadcast
关于替代方法,我认为我看到的唯一替代方法(除了每次调用 udf'ed 函数时创建新对象,这已经被解释为过于昂贵)将创建一个模块,该模块将在导入期间创建相关对象。在这种情况下,将为每个任务执行创建一次对象。因此,这几乎可以让您选择(a)每次任务执行反序列化一次对象,CloudPickleSerializer
如果您只允许 udf 函数捕获它,或者(b)通过导入模块在每次任务执行时创建一次对象。更快的是一个单独的问题 - 但我想答案可能取决于所讨论的对象。在每种情况下,它似乎都相当容易衡量。
推荐阅读
- regex - 如何使用 RewriteCond 匹配特定 URL
- javascript - 如何防止出现蓝色突出显示
- c - 如何搜索 for 循环 C 中的每个元素
- java - 获取复合键的 generatedkey()
- cocos2d-x - 为 windows 64 位编译 cocos2dx 游戏
- ios - 我在 Testflight 中看不到我在 x-code 测试中构建的分支
- makefile - 删除 Makefile 中的重复:目标文件在 Phony 变量 `all` 下写入一次,然后再次作为配方目标
- python - 未定义围绕“TimeoutException”的 Selenium 工作?
- sql - sql窗口函数没有给我正确的输出
- c - 如何使 printf 中的文本出现在 C 中的 scanf 之后?