pyspark - 无法从 spark 数据框中导出数据
问题描述
我使用 spark NLP 解析了 500k 条推文作为测试。数据框看起来不错。我将数组转换为字符串。使用
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def array_to_string(my_list):
return '[' + ','.join([str(elem) for elem in my_list]) + ']'
array_to_string_udf = udf(array_to_string, StringType())
result = result.withColumn('token', array_to_string_udf(result["token"])).withColumn('ner', array_to_string_udf(result["ner"])).withColumn('embeddings', array_to_string_udf(result["embeddings"])).withColumn('ner_chunk', array_to_string_udf(result["ner_chunk"])).withColumn('document', array_to_string_udf(result["document"]))
数据框看起来不错。但是,每当我尝试将其转换为 pandas 时,将其导出为 csv 我都会收到以下错误
PythonException:
An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
File "C:\spark\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\worker.py", line 584, in main
File "C:\spark\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\serializers.py", line 562, in read_int
length = stream.read(4)
File "C:\ProgramData\Anaconda3\lib\socket.py", line 669, in readinto
return self._sock.recv_into(b)
socket.timeout: timed out
这让我觉得 spark 不是在和 python 说话。有谁知道问题可能是什么?
解决方案
所有数据都加载到驱动程序的内存中
调用的效果与调用collecttoPandas
的效果基本相同。
将数据帧的内容写入 csv 的更好方法是直接使用 PySpark 的DataFrameWriter:
result.write.csv('my_result.csv')
编辑:可能与问题没有直接关系,但可以用原生 Spark 函数(lit、concat和concat_ws)替换 udf :
from pyspark.sql import functions as F
result.withColumn("token", F.concat(F.lit("["), F.concat_ws(",", "token"),F.lit("]")))....
用 Spark 函数替换 udf 将提高性能。
推荐阅读
- c++ - 类模板的成员函数的显式实例化声明是否会导致类模板的实例化?
- python - 使用 Bi-LSTM 和 glove 的多类分类
- amazon-web-services - AWS:使用 Batch / ECS 挂载模板磁盘
- flutter - 如何在不传递参数的情况下使用riverpod 系列提供程序
- arrays - 如何将文件中的数据读入C中的二维数组?
- r - 在 R 包中保存 R6 类模板
- java - 使用 Stream 将列表转换为另一个列表
- multithreading - 朱莉娅多线程不能扩展令人尴尬的并行工作
- javascript - 赛普拉斯 - find() 和 within() 方法之间的区别
- regex - 没有结束分隔符的匹配组的正则表达式