首页 > 解决方案 > 如何使用 Spark 读取文件夹文件?

问题描述

我有一个 hdfs 文件夹,在这个文件夹中有很多文件 txt。我想使用 spark 读取这些文件中的内容。

我的代码:

// Create spark session
    val spark = SparkSession.builder()
                  .master("spark://master:7077")
                  .appName("Indexing data to elasticsearch")
                  .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    // Read folder file
    val df:DataFrame = spark.read.text("hdfs://master:9000/user/file/shakespeare")

我想从 DataFrame 获取我文件夹中每个文件的内容。我应该怎么做?

标签: apache-sparkapache-spark-sqlhdfs

解决方案


这是 RDD 似乎比 DataFrame 更灵活的众多情况之一,因为 RDD 提供了wholeTextFiles方法。

wholeTextFiles方法基本上类似于textFile ,但不是读取所有文件中的每行输入,而是读取并存储每个文件的记录/PairRDD/键值对。结果 RDD 的架构将如下所示:

(path_to_file, file_contents)

(但是,使用此方法必须非常小心,因为您可能无法预测目录下文件内容的长度,因此可能会导致内存不足)。

因此,假设我们有许多文本文件(a.txtb.txt、...),每个文件都由它们在其内容中的字母命名,如下所示:

使用wholeTextFiles方法将产生以下 RDD 对(您可能希望摆脱每个文件的完整路径作为每个对的键以获得更好的可读性):

(hdfs:/.../.../a.txt,a aa aaa aaaa aaaaa)
(hdfs:/.../.../b.txt,b bb bbb bbbb bbbbb)
(hdfs:/.../.../c.txt,c cc ccc cccc ccccc)
(hdfs:/.../.../d.txt,d dd ddd dddd ddddd)
(hdfs:/.../.../e.txt,e ee eee eeee eeeee)

使用此方法后,根据您的问题,剩下要做的就是将结果 RDD 转换为 DataFrame,每条记录的数据将包含在两列(由您命名)中:file_namecontent

+---------+-------------------+
|file_name|            content|
+---------+-------------------+
|    a.txt|a aa aaa aaaa aaaaa|
|    b.txt|b bb bbb bbbb bbbbb|
|    c.txt|c cc ccc cccc ccccc|
|    d.txt|d dd ddd dddd ddddd|
|    e.txt|e ee eee eeee eeeee|
+---------+-------------------+

这可能发生在下面的代码片段中

// create a scala spark context to use the wholeTextFiles method
val sc = spark.sparkContext

// create an RDD where the full path of each file is the key 
// and the file's content is the value,
// and get rid of the full path of the file
val input = sc.wholeTextFiles("hdfs://path/to/folder/*")
.map(file => (file._1.split('/').last, file._2))

// convert the RDD to a DataFrame and explicitly name the columns
val input_df = spark.createDataFrame(input).toDF("file_name", "content")

推荐阅读