首页 > 解决方案 > 如何使用 spark toLocalIterator 从集群中写入本地文件系统中的单个文件

问题描述

我有一个 pyspark 作业,它将我的结果数据帧写入本地文件系统。目前它正在local模式下运行,所以我正在coalesce(1)获取一个文件,如下所示

file_format = 'avro'      # will be dynamic and so it will be like avro, json, csv, etc
df.coalesce.write.format(file_format).save('file:///pyspark_data/output')

但是我看到很多内存问题(OOM)并且需要更长的时间。所以我想用 master asyarn和 mode as来运行这个工作client。因此,要将结果写入df本地系统中的单个文件,我需要使用toLocalIteratorwhich yield Rows。如何将这些Rows 流式传输到所需格式的文件(json/avro/csv/parquet 等)?

file_format = 'avro'
for row in df.toLocalIterator():
    # write the data into a single file
    pass

标签: dataframeapache-sparkpyspark

解决方案


您会收到 OOM 错误,因为您尝试使用以下命令将所有数据检索到单个分区中:coalesce(1)

我不建议使用toLocalIterator,因为您将为每种格式重新编写自定义编写器,并且您不会进行并行编写。

您的第一个解决方案是一个很好的解决方案:

df.write.format(file_format).save('file:///pyspark_data/output')

如果您使用hadoop,您可以通过这种方式将所有数据检索到文件系统中:(它适用于csv,您可以尝试其他):

hadoop fs -getmerge <HDFS src> <FS destination>

推荐阅读