首页 > 解决方案 > 在 PySpark 中将 CSV 文件从多个目录转换为镶木地板

问题描述

我有来自多个路径的 CSV 文件,这些路径不是 s3 存储桶中的父目录。所有表都具有相同的分区键。

s3的目录:

table_name_1/partition_key_1 = <pk_1>/partition_key_2 = <pk_2>/file.csv
table_name_2/partition_key_1 = <pk_1>/partition_key_2 = <pk_2>/file.csv
...

我需要将这些 csv 文件转换为 parquet 文件并将它们存储在另一个具有相同目录结构的 s3 存储桶中。

另一个s3的目录:

table_name_1/partition_key_1 = <pk_1>/partition_key_2 = <pk_2>/file.parquet
table_name_2/partition_key_1 = <pk_1>/partition_key_2 = <pk_2>/file.parquet
...

我有一个解决方案是遍历 s3 存储桶并找到 CSV 文件并将其转换为镶木地板并保存到另一个 S3 路径。我发现这种方式效率不高,因为我有一个循环并逐个文件进行转换。

我想利用火花库来提高效率。然后,我尝试了:

spark.read.csv('s3n://bucket_name/table_name_1/').write.partitionBy('partition_key_1', 'partition_key_2').parquet('s3n://another_bucket/table_name_1')

这种方式对每个表都有效,但为了进一步优化,我想将 table_name 作为参数,例如:

TABLE_NAMES = [table_name_1, table_name_2, ...]
spark.read.csv('s3n://bucket_name/{*TABLE_NAMES}/').write.partitionBy('partition_key_1', 'partition_key_2').parquet('s3n://another_bucket/{*TABLE_NAMES}')

谢谢

标签: apache-sparkpysparkapache-spark-sqlparquetdata-partitioning

解决方案


提到的问题提供了一次读取多个文件的解决方案。该方法接受一个或多个路径,如此spark.read.csv(...)所示。对于读取文件,您可以应用相同的逻辑。虽然在写入时,Spark 会将所有给定的数据集/路径合并到一个 Dataframe 中。因此,如果不先应用自定义逻辑,就不可能从一个数据帧生成多个数据帧。所以总而言之,没有这样一种方法可以将初始数据帧直接提取到多个目录中,即.df.write.csv(*TABLE_NAMES)

好消息是 Spark 提供了一个专用函数input_file_name(),它返回当前记录的文件路径。您可以将它与 TABLE_NAMES 结合使用来过滤表名。

这是一种可能的未经测试的 PySpark 解决方案:

from pyspark.sql.functions import input_file_name 

TABLE_NAMES = [table_name_1, table_name_2, ...]

source_path = "s3n://bucket_name/"
input_paths = [f"{source_path}/{t}" for t in TABLE_NAMES]

all_df = spark.read.csv(*input_paths) \
              .withColumn("file_name", input_file_name()) \
              .cache()

dest_path = "s3n://another_bucket/"

def write_table(table_name: string) -> None:
   all_df.where(all_df["file_name"].contains(table_name))
     .write
     .partitionBy('partition_key_1','partition_key_2')
     .parquet(f"{dest_path}/{table_name}")

for t in TABLE_NAMES:
   write_table(t)

解释:

  • 我们生成输入路径并将其存储到input_paths. 这将创建路径,例如:s3n://bucket_name/table1, s3n://bucket_name/table2 ... s3n://bucket_name/tableN.

  • 然后我们将所有路径加载到一个数据框中,在其中添加一个名为 的新列file_name,这将保存每一行的路径。请注意,我们也在cache这里使用,这很重要,因为我们len(TABLE_NAMES)在以下代码中有多个操作。使用缓存会阻止我们一次又一次地加载数据源。

  • 接下来我们创建write_table负责保存给定表的数据的哪个。下一步是使用 过滤基于表名all_df["file_name"].contains(table_name),这将只返回包含列中值的table_name记录file_name。最后,我们像您一样保存过滤后的数据。

  • 在最后一步中,我们调用write_table.TABLE_NAMES

相关链接

如何在一次加载中导入多个 csv 文件?

在 PySpark 中获取序列文件格式文件的 HDFS 文件路径


推荐阅读