dataframe - 如何使用 spark toLocalIterator 从集群中写入本地文件系统中的单个文件
问题描述
我有一个 pyspark 作业,它将我的结果数据帧写入本地文件系统。目前它正在local
模式下运行,所以我正在coalesce(1)
获取一个文件,如下所示
file_format = 'avro' # will be dynamic and so it will be like avro, json, csv, etc
df.coalesce.write.format(file_format).save('file:///pyspark_data/output')
但是我看到很多内存问题(OOM)并且需要更长的时间。所以我想用 master asyarn
和 mode as来运行这个工作client
。因此,要将结果写入df
本地系统中的单个文件,我需要使用toLocalIterator
which yield Row
s。如何将这些Row
s 流式传输到所需格式的文件(json/avro/csv/parquet 等)?
file_format = 'avro'
for row in df.toLocalIterator():
# write the data into a single file
pass
解决方案
您会收到 OOM 错误,因为您尝试使用以下命令将所有数据检索到单个分区中:coalesce(1)
我不建议使用toLocalIterator
,因为您将为每种格式重新编写自定义编写器,并且您不会进行并行编写。
您的第一个解决方案是一个很好的解决方案:
df.write.format(file_format).save('file:///pyspark_data/output')
如果您使用hadoop,您可以通过这种方式将所有数据检索到文件系统中:(它适用于csv,您可以尝试其他):
hadoop fs -getmerge <HDFS src> <FS destination>
推荐阅读
- ios - 当应用程序从 Firebase 实时数据库接收数据时,如何显示 UIActivityIndicator
- python - 根据另一列的值从一列获取数据
- styled-components - 基于状态而不是道具的样式
- html - 通过导航显示背景
- configuration - 模块实例依赖于 Terraform 中相同模块的另一个实例
- mysql - MySQL 在邮件列表中组合人名
- matlab - MATLAB 在 UBUNTU 中写入文本文件“无效权限”
- fpga - 如何进行以 ACLK 为中心的数据传输
- android - Android:比较输入和从firebase检索的数据之间的数据
- java - 如何编写 Java 布尔比较方法