python - 在 multiprocessing.Pool 工作人员之间避免使用全局变量来实现不可提取的共享状态
问题描述
我经常发现自己用 Python 编写程序来构建一个大(兆字节)的只读数据结构,然后使用该数据结构来分析一个非常大(总共数百兆字节)的小记录列表。每条记录都可以并行分析,因此很自然的模式是设置只读数据结构并将其分配给全局变量,然后创建一个multiprocessing.Pool
(通过 将数据结构隐式复制到每个工作进程中fork
)和然后用于imap_unordered
并行处理记录。这种模式的骨架往往看起来像这样:
classifier = None
def classify_row(row):
return classifier.classify(row)
def classify(classifier_spec, data_file):
global classifier
try:
classifier = Classifier(classifier_spec)
with open(data_file, "rt") as fp, \
multiprocessing.Pool() as pool:
rd = csv.DictReader(fp)
yield from pool.imap_unordered(classify_row, rd)
finally:
classifier = None
classify
由于全局变量以及和之间的隐式耦合,我对此不满意classify_row
。理想情况下,我想写
def classify(classifier_spec, data_file):
classifier = Classifier(classifier_spec)
with open(data_file, "rt") as fp, \
multiprocessing.Pool() as pool:
rd = csv.DictReader(fp)
yield from pool.imap_unordered(classifier.classify, rd)
但这不起作用,因为 Classifier 对象通常包含无法腌制的对象(因为它们是由作者不关心的扩展模块定义的);我还读到如果它确实有效,它会非常慢,因为 Classifier 对象会在每次调用绑定方法时被复制到工作进程中。
有更好的选择吗?我只关心 3.x。
解决方案
这出乎意料地棘手。这里的关键是保留对在分叉时可用的变量的读取访问权限,而无需序列化。大多数在多处理中共享内存的解决方案最终都会序列化。我尝试使用 aweakref.proxy
在没有序列化的情况下传入分类器,但这不起作用,因为 dill 和 pickle 都会尝试跟随并序列化所指对象。但是,模块引用有效。
这个组织让我们接近:
import multiprocessing as mp
import csv
def classify(classifier, data_file):
with open(data_file, "rt") as fp, mp.Pool() as pool:
rd = csv.DictReader(fp)
yield from pool.imap_unordered(classifier.classify, rd)
def orchestrate(classifier_spec, data_file):
# construct a classifier from the spec; note that we can
# even dynamically import modules here, using config values
# from the spec
import classifier_module
classifier_module.init(classifier_spec)
return classify(classifier_module, data_file)
if __name__ == '__main__':
list(orchestrate(None, 'data.txt'))
这里需要注意的一些变化:
- 我们添加了
orchestrate
一些 DI 优点的方法;编排计算出如何构造/初始化分类器,并将其交给classify
,将两者解耦 classify
只需要假设classifier
参数有classify
方法;它不关心它是实例还是模块
对于这个概念证明,我们提供了一个显然不可序列化的分类器:
# classifier_module.py
def _create_classifier(spec):
# obviously not pickle-able because it's inside a function
class Classifier():
def __init__(self, spec):
pass
def classify(self, x):
print(x)
return x
return Classifier(spec)
def init(spec):
global __classifier
__classifier = _create_classifier(spec)
def classify(x):
return __classifier.classify(x)
不幸的是,这里仍然有一个全局变量,但它现在被很好地封装在一个模块中作为一个私有变量,并且该模块导出了一个由classify
andinit
函数组成的紧密接口。
这种设计解锁了一些可能性:
orchestrate
可以根据它看到的内容导入和初始化不同的分类器模块classifier_spec
- 也可以将某个
Classifier
类的实例传递给classify
,只要该实例是可序列化的并且具有相同签名的分类方法
推荐阅读
- react-native - React 原生 Redux 离线 API 响应
- sql - 在 MS SQL 中返回 COUNT 行作为 Concat 字符串的一部分
- laravel - 雄辩的默认跳过列,如 deleted_at
- python - python中的二进制文件:缓冲读取
- c# - 如何从异常中获取缺失的引用
- apache - 添加新用户后Htaccess停止工作
- c# - C# 使用自定义 Feed Edge 打印到施乐
- html - 输入元素不接受大写字母
- clip - 当我们剪辑一个 shapefile 时,如何在 arcpy 中获得内部和外部剪辑输出?
- git - 为该 git repo 的每个分支克隆父 jenkins 作业