首页 > 解决方案 > 为多个未格式化文件触发 WholeTextFiles 的更快方法。PySpark

问题描述

我正在使用 spark 读取多个小文件。每个文件都是客户特定的格式,并包含多个表(每个表都有不同的结构)。我们创建了一个 python 解析器,它可以工作并处理给定路径的分区。让我通过模式来解释:

folder
|- file_number=0001
   |- year=2019
      |- month=10
         |- day=21
            |- hour=17
               |- file.txt
|- file_number=0002
   |- year=2019
      |- month=10
         |- day=21
            |- hour=17
               |- file.txt
etc
.
.
.

所以天真的方法是:

sc.wholeTextFiles('/path/to/file_number=*/year=*/month=*/day=*/hour=*/*.txt')\ # This is a pair (path, file Content)
  .flatMap(lambda x: parser(x[1], x[0]))\ # This is the parser function. Is plain python and works fast. We use the path to pick up the partitioning. The parser returns a list of tuples that's why flatMap
  .foldByKey([], lambda x, y: x + y)      # The key is the table name and the value is the data as a list of dicts in a tabular-like style

转换.wholeTextFiles('/path/to/file_number=*/year=*/month=*/day=*/hour=*/*.txt')需要大量的时间,考虑到其余部分花费的时间并不多。

这个博客为止,问题可能是一些递归调用,所以最好先列出所有文件,然后读取每个文件。我无法按照FileSystem.open(path)链接中的建议使用 Hadoop,因为我正在研究 Azure Data Lake Gen2。但是,列出所有使用的文件dbutlis.fs确实很快。

所以问题是:如何使用这样的列表来并行读取和解析文件?. 问题是wholeTextFile不接受列表作为参数。我已经尝试了以下所有方法:

list_of_all_files_paths = dbutils.someCode()

# Attempt 1: Type mismatch
rdd = sc.wholeTextFile(list_of_all_files_paths)

# Attempt 2: Works but all partitiong info is lost
rdd = spark.read.text(list_of_all_files_paths, wholetext=True)

# Attempt 3:  Takes a lot of time too
rdd = spark.read.text('path/to/')

# Attempt 3: This is the most succesfull approach... but looks sooo bad, and is not very fast neither...
rdd = sc.emptyRDD()
for path in list_of_all_files_paths:
  newRDD = sc.wholeTextFiles(path)
  rdd    = rdd.union(newRDD)

标签: apache-sparkpysparkazure-databricks

解决方案


正如@jxc 在评论中回答的那样。解决方案非常简单:

rdd = sc.wholeTextFile(','.join(list_of_all_files_paths))

事实证明,表示路径列表的字符串是有效输入。由于数量仍然很高,I/O但至少上市部分已经变得非常少


推荐阅读