apache-spark - 如何在 Databricks dbfs 中列出文件键 **without** dbutils
问题描述
显然dbutils 不能在命令行 spark-submits 中使用,您必须为此使用 Jar Jobs,但由于其他要求,我必须使用 spark-submit 样式的作业,但仍然需要在 dbfs 中列出和迭代文件键以就将哪些文件用作进程的输入做出一些决定...
使用 scala,我可以使用 spark 或 hadoop 中的什么库来检索dbfs:/filekeys
特定模式的列表?
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
def ls(sparkSession: SparkSession, inputDir: String): Seq[String] = {
println(s"FileUtils.ls path: $inputDir")
val path = new Path(inputDir)
val fs = path.getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
val fileStatuses = fs.listStatus(path)
fileStatuses.filter(_.isFile).map(_.getPath).map(_.getName).toSeq
}
使用上述方法,如果我传入部分键前缀,例如dbfs:/mnt/path/to/folder
在所述“文件夹”中存在以下键:
/mnt/path/to/folder/file1.csv
/mnt/path/to/folder/file2.csv
dbfs:/mnt/path/to/folder is not a directory
当它击中时
我得到val path = new Path(inputDir)
解决方案
需要使用 SparkSession 来完成。
我们是这样做的:
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SparkSession
def getFileSystem(sparkSession: SparkSession): FileSystem =
FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
def listContents(sparkSession: SparkSession, dir: String): Seq[String] = {
getFileSystem(sparkSession).listStatus(new path(dir)).toSeq.map(_.getPath).map(_.getName)
}
推荐阅读
- java - Java 和 Golang http 性能比较
- xml - 运行与数据转换相关的Java代码时出现xsl代码问题
- json - XSD 中的所有限制在 Avro 中都可用吗?
- c++ - 如何查找导致结果不一致的代码
- python - 基于标签的共现图像聚类
- c# - ASP.NET Core 3.0 端点路由不适用于默认路由
- gradle - 在 packageRunBuild Segmentation Fault 上 Gradle 构建失败
- python - C++ python模块(基于Pybind11)导入报错:ModuleNotFoundError
- javascript - 如何正确创建班级成员?
- c# - 如何在 .netcore-3.0 中测试标头属性?