python - 是否可以在 pySpark 中修改输出数据文件名?
问题描述
简化案例。
鉴于我在目录中有 5 个输入文件data_directory
:
data_2020-01-01.txt,
data_2020-01-02.txt,
data_2020-01-03.txt,
data_2020-01-04.txt,
data_2020-01-05.txt
我将它们全部阅读到 pySpark RDD 并对它们执行一些不做任何改组的操作。
spark = SparkSession.builder.appName("Clean Data").getOrCreate()
sparkContext = spark.sparkContext
input_rdd = sparkContext.textFile("data_directory/*")
result = input_rdd.mapPartitions(lambda x: remove_corrupted_rows(x))
现在我想保存数据:
result.saveAsTextFile(
"results",
compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec",
)
我得到 5 个文件,每个文件都包含名称“part”。因此,我丢失了有关输出文件来自哪个输入文件的信息:
._SUCCESS.crc
.part-00000.gz.crc
.part-00001.gz.crc
.part-00002.gz.crc
.part-00003.gz.crc
.part-00004.gz.crc
_SUCCESS
part-00000.gz
part-00001.gz
part-00002.gz
part-00003.gz
part-00004.gz
在这种情况下,是否有保留输入文件名或引入我自己的命名模式?
预期的期望结果:
._SUCCESS.crc
.data_2020-01-01.gz.crc
.data_2020-01-02.gz.crc
.data_2020-01-03.gz.crc
.data_2020-01-04.gz.crc
.data_2020-01-05.crc
_SUCCESS
data_2020-01-01.gz
data_2020-01-02.gz
data_2020-01-03.gz
data_2020-01-04.gz
data_2020-01-05.gz
解决方案
您可以使用pyspark.sql.functions.input_file_name()
(此处的文档https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=functions#pyspark.sql.functions.input_file_name)然后按列对数据框进行分区创建的。
这样,5 个输入文件应该为您提供一个具有 5 个不同值的分类列,并且对其进行分区应该将您的输出分成 5 个部分。
或者,如果您希望有一个完整的命名模式,然后在功能上拆分input_file_name()
列上的数据框(这里分为 5 个数据框),重新分区(例如使用 1 到 1 coalesce(1)
),然后使用自定义逻辑(例如 dict 映射或通过提取文件名)保存从列并解析DataFrameWriter.csv()
为名称)。
注意:当更改为 1 个分区时,请确保数据适合您的内存!
推荐阅读
- elasticsearch - Elastic.elasticsearch 角色和 Ansible Tower 动态清单
- php - jodit Yii2 在上传时更改文件名
- php - Guzzle 不支持带有自定义端口的端点吗?
- python - 如何从 .txt 列表中获取数字并将结果保存在另一个 .txt 中?
- android - macOS Catalina:repo 错误:由于获取错误而退出同步
- java - SnakeYaml 获取堆叠的密钥
- laravel - Symfony\Component\Debug\Exception\FatalErrorException' 带有消息'找不到类'主题'
- kubernetes - 在 Pod 中设置 net.ipv4.tcp_timestamps=0
- python - 需要在 pyspark 中读取最大日期文件夹文件 - Databricks
- java - 存储配置数据的最佳实践