首页 > 解决方案 > Python multiprocessing.Processgetting 在尝试返回结果时生成 AssertionError?

问题描述

我正在尝试multiprocessing用于作为单个进程完成时非常慢的任务。正如您在下面的代码中看到的,每个进程都应该返回一些结果 ( return_dict)。我最初用一个 10K 行的数据集(文件中存储的数据docs.txt,大约 70mb)测试了这段代码,并且代码按预期运行。但是,当我将脚本用于完整数据集(大约 5.6gb)时,我得到了一个AssertionError如问题底部所示的结果。我想知道是否有人知道可能导致它的原因以及我如何能够避免它。谢谢。

from multiprocessing import Process, Manager
import os, io, numpy
from gensim.models.doc2vec import Doc2Vec

def worker(i, data, return_dict):
    model = Doc2Vec.load("D:\\Project1\\doc2vec_model_DM_20180814.model")
    results = numpy.zeros((len(data), model.vector_size))
    for id, doc in enumerate(data):
        results[id,:] = model.infer_vector(doc, alpha = 0.01, steps = 100)
    return_dict[i] = results

if __name__ == '__main__':
    import time
    a  = time.time()
    path = "D:\\Project1\\docs.txt"    # <<=== data stored in this file
    data = []
    manager = Manager()
    jobs = []
    return_dict = manager.dict()

    with io.open(path, "r+", encoding = "utf-8") as datafile:
        for id, row in enumerate(datafile):
            row = row.strip().split('\t')[0].split()
            data.append(row)

    step = numpy.floor(len(data)/20)
    intervals = numpy.arange(0, len(data), step = int(step)).tolist()
    intervals.append(len(data))

    for i in range(len(intervals) - 1):
        p = Process(target=worker, args=(i, data[intervals[i]:intervals[i+1]], return_dict))
        jobs.append(p)
        p.start()
    for proc in jobs:
        proc.join()

    results = numpy.zeros((len(data), 1000))
    start = 0
    end = 0
    for _, result in return_dict.items():    #<<===Where error happens
        end = end + result.shape[0]
        results[start:end,:] = result[:,:]
        start = end

    print(time.time() - a)

错误信息:

Traceback (most recent call last):
  File "D:\Project1\multiprocessing_test.py", line 43, in <module>
    for _, result in return_dict.items():
  File "<string>", line 2, in items
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\managers.py", line 757, in _callmethod
    kind, result = conn.recv()
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 318, in _recv_bytes
    return self._get_more_data(ov, maxsize)
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 337, in _get_more_data
    assert left > 0
AssertionError

标签: pythonpython-3.xmultiprocessingpython-multithreading

解决方案


这是我的案例和解决方案。希望这能有所帮助!我有一个名为“func”的处理函数

partial_func = partial(func,a=params1,b=params2)
for i, _ in enumerate(pool.imap(partial_func, [1]))):
    pass

根本原因是我传递给“ partial_func ”的 params1 和 params2 太大。


推荐阅读