apache-spark - 如何使用 Spark 读取文件夹文件?
问题描述
我有一个 hdfs 文件夹,在这个文件夹中有很多文件 txt。我想使用 spark 读取这些文件中的内容。
我的代码:
// Create spark session
val spark = SparkSession.builder()
.master("spark://master:7077")
.appName("Indexing data to elasticsearch")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
// Read folder file
val df:DataFrame = spark.read.text("hdfs://master:9000/user/file/shakespeare")
我想从 DataFrame 获取我文件夹中每个文件的内容。我应该怎么做?
解决方案
这是 RDD 似乎比 DataFrame 更灵活的众多情况之一,因为 RDD 提供了wholeTextFiles方法。
wholeTextFiles方法基本上类似于textFile ,但它不是读取所有文件中的每行输入,而是读取并存储每个文件的记录/PairRDD/键值对。结果 RDD 的架构将如下所示:
(path_to_file, file_contents)
(但是,使用此方法必须非常小心,因为您可能无法预测目录下文件内容的长度,因此可能会导致内存不足)。
因此,假设我们有许多文本文件(a.txt、b.txt、...),每个文件都由它们在其内容中的字母命名,如下所示:
使用wholeTextFiles方法将产生以下 RDD 对(您可能希望摆脱每个文件的完整路径作为每个对的键以获得更好的可读性):
(hdfs:/.../.../a.txt,a aa aaa aaaa aaaaa)
(hdfs:/.../.../b.txt,b bb bbb bbbb bbbbb)
(hdfs:/.../.../c.txt,c cc ccc cccc ccccc)
(hdfs:/.../.../d.txt,d dd ddd dddd ddddd)
(hdfs:/.../.../e.txt,e ee eee eeee eeeee)
使用此方法后,根据您的问题,剩下要做的就是将结果 RDD 转换为 DataFrame,每条记录的数据将包含在两列(由您命名)中:file_name和content。
+---------+-------------------+
|file_name| content|
+---------+-------------------+
| a.txt|a aa aaa aaaa aaaaa|
| b.txt|b bb bbb bbbb bbbbb|
| c.txt|c cc ccc cccc ccccc|
| d.txt|d dd ddd dddd ddddd|
| e.txt|e ee eee eeee eeeee|
+---------+-------------------+
这可能发生在下面的代码片段中
// create a scala spark context to use the wholeTextFiles method
val sc = spark.sparkContext
// create an RDD where the full path of each file is the key
// and the file's content is the value,
// and get rid of the full path of the file
val input = sc.wholeTextFiles("hdfs://path/to/folder/*")
.map(file => (file._1.split('/').last, file._2))
// convert the RDD to a DataFrame and explicitly name the columns
val input_df = spark.createDataFrame(input).toDF("file_name", "content")
推荐阅读
- reactjs - 如何通过 Typescript React 应用程序中的脚本访问库?
- angular - 无法匹配任何路由。URL 段:“登录”
- python-3.x - 使用 pandas 数据框根据 A 列和 B 列中的值填充 C 列
- r - R将字符和向量列表转换为data.table of characters
- javascript - 如何格式化值数组以形成谷歌图表直方图的分布?
- python-3.x - 如果未提供,则从 SQLAlchemy 查询的“filter_by”选项中删除 kwargs 的最佳方法
- generics - 为什么在实现 trait 时需要重复我的泛型类型约束?
- javascript - Meteor.js 聊天 - scrollToBottom(); 仅适用于发送选项卡
- angular7 - 如何在Angular 7中返回带有参数的路由
- angular - Mat-toolbar 不将组件变量绑定到模板