首页 > 解决方案 > 多处理 API

问题描述

num_folds = 3

def callModelScore(model, datax, datay, scoringType, folds):
    rating = model_selection.cross_val_score(gnb,
        X, y, scoring=scoringType, cv=num_folds)
    meanRating = str(round(100*rating.mean(), 2))
    print(scoringType + " " + meanRating + "%")
    return rating

from multiprocessing import Process

p1 = Process(target =  callModelScore,args =(gnb, X, y, 'accuracy', num_folds,))
p2 = Process(target =  callModelScore,args =(gnb, X, y, 'accuracy', num_folds,))
p3 = Process(target =  callModelScore,args =(gnb, X, y, 'accuracy', num_folds,))
p4 = Process(target =  callModelScore,args =(gnb, X, y, 'accuracy', num_folds,))
p1.start()
p2.start()
p3.start()
p4.start()
p1.join()
p2.join()
p3.join()
p4.join()

错误:

BrokenPipeError                           Traceback (most recent call last)
<ipython-input-22-0aaad613b937> in <module>
      5 p3 = Process(target =  callModelScore,args =(gnb, X, y, 'accuracy', num_folds,))
      6 p4 = Process(target =  callModelScore,args =(gnb, X, y, 'accuracy', num_folds,))
----> 7 p1.start()
      8 p2.start()
      9 p3.start()

~\Anaconda3\lib\multiprocessing\process.py in start(self)
    110                'daemonic processes are not allowed to have children'
    111         _cleanup()
--> 112         self._popen = self._Popen(self)
    113         self._sentinel = self._popen.sentinel
    114         # Avoid a refcycle if the target function holds an indirect

~\Anaconda3\lib\multiprocessing\context.py in _Popen(process_obj)
    221     @staticmethod
    222     def _Popen(process_obj):
--> 223         return _default_context.get_context().Process._Popen(process_obj)
    224
    225 class DefaultContext(BaseContext):

~\Anaconda3\lib\multiprocessing\context.py in _Popen(process_obj)
    320         def _Popen(process_obj):
    321             from .popen_spawn_win32 import Popen
--> 322             return Popen(process_obj)
    323
    324     class SpawnContext(BaseContext):

~\Anaconda3\lib\multiprocessing\popen_spawn_win32.py in __init__(self, process_obj)
     87             try:
     88                 reduction.dump(prep_data, to_child)
---> 89                 reduction.dump(process_obj, to_child)
     90             finally:
     91                 set_spawning_popen(None)

~\Anaconda3\lib\multiprocessing\reduction.py in dump(obj, file, protocol)
     58 def dump(obj, file, protocol=None):
     59     '''Replacement for pickle.dump() using ForkingPickler.'''
---> 60     ForkingPickler(file, protocol).dump(obj)
     61
     62 #

BrokenPipeError: [Errno 32] Broken pipe

我正在使用 python API 进行多处理来测试 API,但我似乎无法正常工作。我什至尝试了 python 3 文档中的一些测试代码。我将 jupyter notebook 与 anaconda 一起使用。

标签: pythonpython-multiprocessing

解决方案


我不是这方面的专家,但我相信 Jupyter Notebooks 已经使用 pickle 进行数据序列化。这意味着子进程的数据流与生成它们的主进程之间存在歧义。幸运的是,似乎正在积极维护一个多处理的分叉项目。请参阅多进程API(注意,它因“ing”而异)。此 API 使用 dill 代替 pickle。除了搞笑之外,它还应该让你的 shell 区分子进程和主进程。文档中也没有提到这一点:

此包中的功能要求子模块可以导入主模块。这在编程指南中有介绍,但值得在这里指出。这意味着某些示例,例如 multiprocessing.pool.Pool 示例将无法在交互式解释器中工作。


推荐阅读