首页 > 解决方案 > 使用 partial() 进行多处理会引发 ForkingPickler 错误

问题描述

我正在尝试从 PubMed 抓取摘要并通过 python 使用正则表达式对其进行过滤。为了加快速度,我想使用 pythons 多处理池。

我的代码如下所示:

import multiprocessing as mp
from functools import partial
from typing import List, Tuple

def beautify_abstract(abstract: str, regex: str):
    import re
    result: str = ""
    last_start = 0
    matches = re.finditer(regex, abstract, re.MULTILINE)
    for matchNum, match in enumerate(matches, start=1):
        result += abstract[last_start:match.start()]
        result += "<b>"
        result += abstract[match.start():match.end()]
        result += "</b>"
        last_start = match.end()
    result += abstract[last_start:]
    return result

def get_doi(pim: str, regex: str):
    from Bio import Entrez
    from Bio.Entrez import efetch
    import re
    from metapub.convert import pmid2doi
    Entrez.email = "Your.Name.Here@example.org"
    print(f"Processing {pim}")
    abstract_handle = efetch(db="pubmed", id=pim, retmode='text', rettype='all')
    abstract = abstract_handle.read()
    abstract_handle.close()

    if re.search(regex, abstract, re.MULTILINE) is not None:
        docsum_handle = efetch(db="pubmed", id=pim, retmode='text', rettype='docsum').read()
        docsum = docsum_handle.read()
        try:
            doi = pmid2doi(pim)
        except:
            doi = "UNKNOWN"
        return f"{doi}"
    return ""

def get_pim_with_regex_list(keywords: List[str]) -> List[str]:
    from Bio import Entrez
    Entrez.email = "Your.Name.Here@example.org"
    searchterm = " ".join(keywords)
    pims = []
    handle = Entrez.esearch(db="pubmed", retstart=0, retmax=0, term=searchterm, idtype="acc")
    record = Entrez.read(handle)
    handle.close()
    count = int(record['Count'])
    if count > 100000:
        retmax = 100000
    else:
        retmax = count
    retstart = 0
    while retstart < count:
        handle = Entrez.esearch(db="pubmed", retstart=retstart, retmax=retmax, term=searchterm, idtype="acc")
        record = Entrez.read(handle)
        handle.close()
        for pim in record['IdList']:
            pims.append(pim)
        retstart += retmax
    return pims

if __name__ == '__main__':
    keywords = ["keyword1", "keyword2"]
    pim_list = get_pim_with_regex_list(keywords)
    regex = "keyword1 keyword2"

    worker_fn = partial(get_doi, regex=regex)
    pool = mp.Pool(mp.cpu_count())
    entries = pool.map(worker_fn, pim_list)
    pool.close()
    pool.join()

当我运行给定的代码时,我收到以下错误:

Traceback (most recent call last):
  File "/usr/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.9/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "/usr/lib/python3.9/multiprocessing/queues.py", line 368, in get
    return _ForkingPickler.loads(res)
TypeError: __new__() missing 2 required positional arguments: 'tag' and 'attributes'
Process ForkPoolWorker-4:
Traceback (most recent call last):
  File "/usr/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.9/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "/usr/lib/python3.9/multiprocessing/queues.py", line 368, in get
    return _ForkingPickler.loads(res)
TypeError: __new__() missing 2 required positional arguments: 'tag' and 'attributes'

我对 python 的多处理进行了一些研究,发现只支持 python 本机类型作为参数(由 ForkingPickler 强制执行)。假设这str是本机类型,代码应该可以工作......目前,我完全迷失了,不知道可能是什么问题。

按照建议,我在这里上传了一个最小(顺序)工作示例

有没有办法解决这个问题或至少在这里诊断出真正的问题?

标签: pythonpython-3.xmultiprocessingpython-multiprocessing

解决方案


推荐阅读