首页 > 解决方案 > 在 multiprocessing.Pool 工作人员之间避免使用全局变量来实现不可提取的共享状态

问题描述

我经常发现自己用 Python 编写程序来构建一个大(兆字节)的只读数据结构,然后使用该数据结构来分析一个非常大(总共数百兆字节)的小记录列表。每条记录都可以并行分析,因此很自然的模式是设置只读数据结构并将其分配给全局变量,然后创建一个multiprocessing.Pool(通过 将数据结构隐式复制到每个工作进程中fork)和然后用于imap_unordered并行处理记录。这种模式的骨架往往看起来像这样:

classifier = None
def classify_row(row):
    return classifier.classify(row)

def classify(classifier_spec, data_file):
    global classifier
    try:
        classifier = Classifier(classifier_spec)
        with open(data_file, "rt") as fp, \
             multiprocessing.Pool() as pool:
            rd = csv.DictReader(fp)
            yield from pool.imap_unordered(classify_row, rd)
    finally:
        classifier = None

classify由于全局变量以及和之间的隐式耦合,我对此不满意classify_row。理想情况下,我想写

def classify(classifier_spec, data_file):
    classifier = Classifier(classifier_spec)
    with open(data_file, "rt") as fp, \
         multiprocessing.Pool() as pool:
        rd = csv.DictReader(fp)
        yield from pool.imap_unordered(classifier.classify, rd)

但这不起作用,因为 Classifier 对象通常包含无法腌制的对象(因为它们是由作者不关心的扩展模块定义的);我还读到如果它确实有效,它会非常慢,因为 Classifier 对象会在每次调用绑定方法时被复制到工作进程中。

有更好的选择吗?我只关心 3.x。

标签: pythonpython-3.xmultiprocessingforkpython-multiprocessing

解决方案


这出乎意料地棘手。这里的关键是保留对在分叉时可用的变量的读取访问权限,而无需序列化。大多数在多处理中共享内存的解决方案最终都会序列化。我尝试使用 aweakref.proxy在没有序列化的情况下传入分类器,但这不起作用,因为 dill 和 pickle 都会尝试跟随并序列化所指对象。但是,模块引用有效。

这个组织让我们接近:

import multiprocessing as mp
import csv


def classify(classifier, data_file):

    with open(data_file, "rt") as fp, mp.Pool() as pool:
        rd = csv.DictReader(fp)
        yield from pool.imap_unordered(classifier.classify, rd)


def orchestrate(classifier_spec, data_file):
    # construct a classifier from the spec; note that we can
    # even dynamically import modules here, using config values
    # from the spec
    import classifier_module
    classifier_module.init(classifier_spec)
    return classify(classifier_module, data_file)


if __name__ == '__main__':
    list(orchestrate(None, 'data.txt'))

这里需要注意的一些变化:

  • 我们添加了orchestrate一些 DI 优点的方法;编排计算出如何构造/初始化分类器,并将其交给classify,将两者解耦
  • classify只需要假设classifier参数有classify方法;它不关心它是实例还是模块

对于这个概念证明,我们提供了一个显然不可序列化的分类器:

# classifier_module.py
def _create_classifier(spec):

    # obviously not pickle-able because it's inside a function
    class Classifier():

        def __init__(self, spec):
            pass

        def classify(self, x):
            print(x)
            return x

    return Classifier(spec)


def init(spec):
    global __classifier
    __classifier = _create_classifier(spec)


def classify(x):
    return __classifier.classify(x)

不幸的是,这里仍然有一个全局变量,但它现在被很好地封装在一个模块中作为一个私有变量,并且该模块导出了一个由classifyandinit函数组成的紧密接口。

这种设计解锁了一些可能性:

  • orchestrate可以根据它看到的内容导入和初始化不同的分类器模块classifier_spec
  • 也可以将某个Classifier类的实例传递给classify,只要该实例是可序列化的并且具有相同签名的分类方法

推荐阅读