首页 > 解决方案 > 如何在 Databricks dbfs 中列出文件键 **without** dbutils

问题描述

显然dbutils 不能在命令行 spark-submits 中使用,您必须为此使用 Jar Jobs,但由于其他要求,我必须使用 spark-submit 样式的作业,但仍然需要在 dbfs 中列出和迭代文件键以就将哪些文件用作进程的输入做出一些决定...

使用 scala,我可以使用 spark 或 hadoop 中的什么库来检索dbfs:/filekeys特定模式的列表?

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession

def ls(sparkSession: SparkSession, inputDir: String): Seq[String] = {
  println(s"FileUtils.ls path: $inputDir")
  val path = new Path(inputDir)
  val fs = path.getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
  val fileStatuses = fs.listStatus(path)
  fileStatuses.filter(_.isFile).map(_.getPath).map(_.getName).toSeq
}

使用上述方法,如果我传入部分键前缀,例如dbfs:/mnt/path/to/folder在所述“文件夹”中存在以下键:

dbfs:/mnt/path/to/folder is not a directory当它击中时 我得到val path = new Path(inputDir)

标签: apache-sparkhadoopfilesystemsazure-databricks

解决方案


需要使用 SparkSession 来完成。

我们是这样做的:

import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SparkSession

def getFileSystem(sparkSession: SparkSession): FileSystem =
    FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)

def listContents(sparkSession: SparkSession, dir: String): Seq[String] = {
  getFileSystem(sparkSession).listStatus(new path(dir)).toSeq.map(_.getPath).map(_.getName)
}

推荐阅读