python - 如何在 Pyspark 中使用 Spacy?
问题描述
我正在尝试将 Spacy 与 Pyspark 一起使用。由于 Spacy 模型不可序列化,因此无法广播。我找到的解决方案是在工人端加载模型。但是为每个任务加载模型可能很耗时。我找到的解决方案是使用一个全局变量来存储加载的模型。并且由于“spark.python.worker.reuse”默认为True,模型将只在每个worker中加载一次。
accu = spark.sparkContext.accumulator(0)
spmodel = None
def get_sp():
global spmodel
if not spmodel:
accu.add(1)
spmodel = spacy.load("ja_core_news_sm")
return spmodel
def compute_ner(s_text:pd.Series) -> pd.DataFrame:
nlp = get_sp()
docs = nlp.pipe(s_text)
ents=[
[(ent.text, ent.label_) for ent in doc.ents]
for doc in docs
]
preds = []
# Some Instructions ..
return pd.DataFrame(preds)
udf_calculate = F.pandas_udf(compute_ner, returnType=return_types)
ner_df = df.repartition(16).withColumn('ner', udf_calculate('text'))
我使用了一个累加器来查看模型加载了多少次。我已经使用 8 个专用内核在本地模式下测试了该应用程序。我使用的 DataFrame 分为 16 个分区。我惊讶地发现累加器的值为 16。所以 spmodel 始终为 None。我有点困惑,有人可以解释发生了什么,或者我错过了什么。
解决方案
推荐阅读
- python - 如何平均字典中的值?
- json - 转换 apollo-datasource-rest 响应结构以适应已经定义的远程模式
- sas - SAS - 在行中组合相似的值,然后为不相似的值添加新变量
- networking - 格式错误的 IP Scapy
- ios - 直接从应用程序将图像保存在 iCloud 驱动器中
- google-apps-script - 自动排序由脚本移动的数据
- google-cloud-platform - GCP 上的 VPC 网络连接
- jquery - jQuery可以生成选择元素
- powerbi - 每个时间序列的计算列
- python - 执行我的 arp_spoofing python 脚本时出现回溯错误