regex - 如何让 Spark 会话递归读取所有文件?
问题描述
显示存储 JSON 文件的目录:
$ tree -d try/
try/
├── 10thOct_logs1
├── 11thOct
│ └── logs2
└── Oct
└── 12th
└── logs3
任务是使用 读取所有日志SparkSession
。
有没有一种优雅的方法可以递归地读取目录中的所有文件,然后是子目录?
我尝试过的很少有命令容易导致无意排除。
spark.read.json("file:///var/foo/try/<exp>")
+----------+---+-----+-------+
| <exp> -> | * | */* | */*/* |
+----------+---+-----+-------+
| logs1 | y | y | n |
| logs2 | n | y | y |
| logs3 | n | n | y |
+----------+---+-----+-------+
您可以在上表中看到,三个表达式中没有一个同时匹配所有目录(位于 3 个不同的深度)。坦率地说,我没想到10thOct_logs1
在使用第三个表达式时会排除*/*/*
。
这使我得出结论,与最后一个表达式匹配的任何文件或目录路径/
都被视为完全匹配,而其他所有内容都被忽略。
解决方案
更新
Spark 3 中引入了一个从嵌套文件夹中读取的新选项recursiveFileLookup
:
spark.read.option("recursiveFileLookup", "true").json("file:///var/foo/try")
对于旧版本,或者,您可以使用 HadooplistFiles
以递归方式列出所有文件路径,然后将它们传递给 Spark 读取:
import org.apache.hadoop.fs.{Path}
val conf = sc.hadoopConfiguration
// get all file paths
val fromFolder = new Path("file:///var/foo/try/")
val logfiles = fromFolder.getFileSystem(conf).listFiles(fromFolder, true)
var files = Seq[String]()
while (logfiles.hasNext) {
// one can filter here some specific files
files = files :+ logfiles.next().getPath().toString
}
// read multiple paths
val df = spark.read.csv(files: _*)
df.select(input_file_name()).distinct().show(false)
+-------------------------------------+
|input_file_name() |
+-------------------------------------+
|file:///var/foo/try/11thOct/log2.csv |
|file:///var/foo/try/10thOct_logs1.csv|
|file:///var/foo/try/Oct/12th/log3.csv|
+-------------------------------------+
推荐阅读
- python - post_save 在 User 和 Profile 模型中具有 onetoone 关系时导致 IntegrityError
- java - 如何在工具栏中为每个片段设置标题和后退箭头按钮?
- python - 在 django 中找不到查看功能
- javascript - 读取 Dom ID Jquery 方法
- grafana - Prometheus:从标签值中提取子字符串?
- abap - 没有循环计算总计和小计?
- debugging - 从用户空间访问内存时linux hw_breakpoint不起作用
- angular - 提交后反应式表单Angular删除验证
- python - 使用 OpenCV 进行轮廓识别
- python - 对字符串列表进行数字排序