首页 > 解决方案 > 如何在 Pyspark 中使用 Spacy?

问题描述

我正在尝试将 Spacy 与 Pyspark 一起使用。由于 Spacy 模型不可序列化,因此无法广播。我找到的解决方案是在工人端加载模型。但是为每个任务加载模型可能很耗时。我找到的解决方案是使用一个全局变量来存储加载的模型。并且由于“spark.python.worker.reuse”默认为True,模型将只在每个worker中加载一次。

accu = spark.sparkContext.accumulator(0)
spmodel = None
def get_sp():
    global spmodel
    if not spmodel:
        accu.add(1)
        spmodel = spacy.load("ja_core_news_sm")
    return spmodel

def compute_ner(s_text:pd.Series) -> pd.DataFrame:
    nlp = get_sp()
    docs = nlp.pipe(s_text)
    ents=[
         [(ent.text, ent.label_) for ent in doc.ents]
         for doc in docs
         ]
    preds = []
    # Some Instructions .. 
    return pd.DataFrame(preds)
udf_calculate = F.pandas_udf(compute_ner, returnType=return_types)
ner_df = df.repartition(16).withColumn('ner', udf_calculate('text'))

我使用了一个累加器来查看模型加载了多少次。我已经使用 8 个专用内核在本地模式下测试了该应用程序。我使用的 DataFrame 分为 16 个分区。我惊讶地发现累加器的值为 16。所以 spmodel 始终为 None。我有点困惑,有人可以解释发生了什么,或者我错过了什么。

标签: pythonapache-sparkpysparknlpspacy

解决方案


推荐阅读