apache-spark - 在 PySpark 中将 CSV 文件从多个目录转换为镶木地板
问题描述
我有来自多个路径的 CSV 文件,这些路径不是 s3 存储桶中的父目录。所有表都具有相同的分区键。
s3的目录:
table_name_1/partition_key_1 = <pk_1>/partition_key_2 = <pk_2>/file.csv
table_name_2/partition_key_1 = <pk_1>/partition_key_2 = <pk_2>/file.csv
...
我需要将这些 csv 文件转换为 parquet 文件并将它们存储在另一个具有相同目录结构的 s3 存储桶中。
另一个s3的目录:
table_name_1/partition_key_1 = <pk_1>/partition_key_2 = <pk_2>/file.parquet
table_name_2/partition_key_1 = <pk_1>/partition_key_2 = <pk_2>/file.parquet
...
我有一个解决方案是遍历 s3 存储桶并找到 CSV 文件并将其转换为镶木地板并保存到另一个 S3 路径。我发现这种方式效率不高,因为我有一个循环并逐个文件进行转换。
我想利用火花库来提高效率。然后,我尝试了:
spark.read.csv('s3n://bucket_name/table_name_1/').write.partitionBy('partition_key_1', 'partition_key_2').parquet('s3n://another_bucket/table_name_1')
这种方式对每个表都有效,但为了进一步优化,我想将 table_name 作为参数,例如:
TABLE_NAMES = [table_name_1, table_name_2, ...]
spark.read.csv('s3n://bucket_name/{*TABLE_NAMES}/').write.partitionBy('partition_key_1', 'partition_key_2').parquet('s3n://another_bucket/{*TABLE_NAMES}')
谢谢
解决方案
提到的问题提供了一次读取多个文件的解决方案。该方法接受一个或多个路径,如此处spark.read.csv(...)
所示。对于读取文件,您可以应用相同的逻辑。虽然在写入时,Spark 会将所有给定的数据集/路径合并到一个 Dataframe 中。因此,如果不先应用自定义逻辑,就不可能从一个数据帧生成多个数据帧。所以总而言之,没有这样一种方法可以将初始数据帧直接提取到多个目录中,即.df.write.csv(*TABLE_NAMES)
好消息是 Spark 提供了一个专用函数input_file_name(),它返回当前记录的文件路径。您可以将它与 TABLE_NAMES 结合使用来过滤表名。
这是一种可能的未经测试的 PySpark 解决方案:
from pyspark.sql.functions import input_file_name
TABLE_NAMES = [table_name_1, table_name_2, ...]
source_path = "s3n://bucket_name/"
input_paths = [f"{source_path}/{t}" for t in TABLE_NAMES]
all_df = spark.read.csv(*input_paths) \
.withColumn("file_name", input_file_name()) \
.cache()
dest_path = "s3n://another_bucket/"
def write_table(table_name: string) -> None:
all_df.where(all_df["file_name"].contains(table_name))
.write
.partitionBy('partition_key_1','partition_key_2')
.parquet(f"{dest_path}/{table_name}")
for t in TABLE_NAMES:
write_table(t)
解释:
我们生成输入路径并将其存储到
input_paths
. 这将创建路径,例如:s3n://bucket_name/table1, s3n://bucket_name/table2 ... s3n://bucket_name/tableN
.然后我们将所有路径加载到一个数据框中,在其中添加一个名为 的新列
file_name
,这将保存每一行的路径。请注意,我们也在cache
这里使用,这很重要,因为我们len(TABLE_NAMES)
在以下代码中有多个操作。使用缓存会阻止我们一次又一次地加载数据源。接下来我们创建
write_table
负责保存给定表的数据的哪个。下一步是使用 过滤基于表名all_df["file_name"].contains(table_name)
,这将只返回包含列中值的table_name
记录file_name
。最后,我们像您一样保存过滤后的数据。在最后一步中,我们调用
write_table
.TABLE_NAMES
相关链接
推荐阅读
- html - 使用列宽和内联块的最少两行
- java - 管理 JMS 消息到多个服务器的传递
- python - Python:如何在指定日期和时间之后遍历一组文件和grep日志
- html - HTML 区别 .jpg .JPG
- spring - 无法使用带有 QueryDSL 和 Web 支持的 Spring Data Repository 查询 Set 中的字段
- serialization - PyGtk 序列化
- c# - 如何将 ContentControl 转换为 PlainTextContentControl/访问 .Text 属性?
- python - 如何将 python 字节对象传递给 C++ 扩展?
- javascript - Javascript - 间隔未清除
- unicode - 如何从 .txt 文件中删除 <96>