首页 > 解决方案 > joblib 结果因返回值而异

问题描述

我必须使用 Spacy 分析大型文本数据集。该数据集包含大约 120000 条记录,典型文本长度约为 1000 个单词。Lemmatizing 文本需要相当长的时间,所以我寻找减少时间的方法。这篇文章描述了如何使用 joblib 加速计算。效果相当不错:16 个内核将 CPU 时间减少了 10 倍,超线程将 CPU 时间减少了 7%。

最近我意识到我想计算文档之间的相似性,并可能在以后对文档进行更多分析。因此,我决定为所有文档生成一个 Spacy 文档实例 (<class 'spacy.tokens.doc.Doc'>),然后将其用于分析(词形还原、矢量化等)。这就是麻烦开始的地方。

并行词形还原器的分析发生在以下函数中:

def lemmatize_pipe(doc):
    lemma_list = [str(tok.lemma_).lower() for tok in doc
                  if tok.is_alpha]

    return lemma_list

(完整的演示代码可以在文章末尾找到)。我所要做的就是返回doc而不是lemma_list我已经准备好了。我想。

def lemmatize_pipe(doc):

    return doc

顺序版本运行时间为 73 秒,并行版本返回lemma_list需要 7 秒,而版本返回doc运行时间为 127 秒:是顺序版本的两倍。完整代码如下。

import time
import pandas as pd
from joblib import Parallel, delayed
import gensim.downloader as api
import spacy

from pdb import set_trace as breakpoint

# Initialize spacy with the small english language model
nlp = spacy.load('en', disable=['parser', 'ner', 'tagger'])
nlp.add_pipe(nlp.create_pipe('sentencizer'))

# Import the dataset and get the text
dataset = api.load("text8")
data = [d for d in dataset]
doc_requested = False

print(len(data), 'documents in original data')

df_data = pd.DataFrame(columns=['content'])
df_data['content'] = df_data['content'].astype(str)

# Content is a list of words, convert is to strings
for doc in data:
    sentence = ' '.join([word for word in doc])
    df_data.loc[len(df_data)] = [sentence]

### === Sequential processing ===
def lemmatize(text):
    doc = nlp(text)
    lemma_list = [str(tok.lemma_).lower() for tok in doc
                  if tok.is_alpha]

    return doc if doc_requested else lemma_list

cpu = time.time()

df_data['sequential'] = df_data['content'].apply(lemmatize)
print('\nSequential processing in {:.0f} seconds'.format(time.time() - cpu))
df_data.head(3)

### === Parallel processing ===
def lemmatize_pipe(doc):
    lemma_list = [str(tok.lemma_).lower() for tok in doc
                  if tok.is_alpha]

    return doc if doc_requested else lemma_list

def chunker(iterable, total_length, chunksize):
    return (iterable[pos: pos + chunksize] for pos in range(0, total_length,
                                                           chunksize))
def process_chunk(texts):
    preproc_pipe = []
    for doc in nlp.pipe(texts, batch_size=20):
        preproc_pipe.append(lemmatize_pipe(doc))

    return preproc_pipe

def preprocess_parallel(data, chunksize):
    executor = Parallel(n_jobs=31, backend='multiprocessing', prefer="processes")
    do = delayed(process_chunk)
    tasks = (do(chunk) for chunk in chunker(data, len(data), chunksize=chunksize))
    result = executor(tasks)
    flattened = [item for sublist in result for item in sublist]

    return flattened

cpu = time.time()
df_data['parallel'] = preprocess_parallel(df_data['content'], chunksize=1)
print('\nParallel processing in {:.0f} seconds'.format(time.time() - cpu))

我已经搜索并尝试了各种方法,但找不到解决方案。最后,我决定将相似性与引理一起计算,但这是一种解决方法。时间增加的真正原因是什么?有没有办法在不浪费太多时间的情况下获取文档?

标签: pythonspacyjoblib

解决方案


腌制doc非常大,包含大量重建文档本身不需要的数据,包括整个模型词汇。使用doc.to_bytes()将是一项重大改进,您可以通过使用exclude排除不需要的数据来进一步改进它,例如doc.tensor

data = doc.to_bytes(exclude=["tensor"])
...
reloaded_doc = Doc(nlp.vocab)
reloaded_doc.from_bytes(data)

比较:

doc = nlp("test")
len(pickle.dumps(doc))                              # 1749721
len(pickle.dumps(doc.to_bytes()))                   # 750
len(pickle.dumps(doc.to_bytes(exclude=["tensor"]))) # 316

您也可以使用doc.to_array()而不是doc.to_bytes()仅导出您需要的注释图层,但是从数组中重新加载文档稍微复杂一些。

看:


推荐阅读