apache-spark - Spark检查是否存在带有正则表达式的输入路径
问题描述
我正在尝试在使用正则表达式计算的路径下读取 JSON 文件,如下所示。
paths.par.foreach
{
path =>
val pathWithRegex = s"${path}/*/${dateRegex}/"
val jsonDF = sqlContext.read.json(pathWithRegex)
}
paths could be - hdfs://servername/data/a, hdfs://servername/data/b, hdfs://servername/data/c
dateRegex could be - 2020-05-*
Directories present in hdfs
hdfs://servername/data/a/something/2020-05-11/file1
hdfs://servername/data/a/something/2020-05-12/file1
hdfs://servername/data/b/something/2020-05-11/file1
hdfs://servername/data/c/something/2020-06-11/file1
当我将 2020-05-* 作为 dateRegex 传递时,它会为 hdfs://servername/data/c/ /2020-05- / 抛出错误,因为路径不存在。有没有办法不抛出错误并继续?我尝试使用下面的 checkDirExist 方法,但它似乎不适用于正则表达式/模式。
def checkDirExist(path: String, sc:SparkContext): Boolean = {
val fs = FileSystem.get(sc.hadoopConfiguration)
val p = new Path(path)
fs.exists(p)
}
paths.par.foreach
{
path =>
val pathWithRegex = s"${path}/*/${dateRegex}/"
if(checkDirExist(pathWithRegex, sc)){ //Doesn't work. Always false if pattern is in path string
val jsonDF = sqlContext.read.json(pathWithRegex)
}
}
解决方案
检查下面的代码。
下面的代码适用于hdfs
, s3
& local filesystem
。
导入所需的库。
import scala.util.matching.Regex
import java.net.URI
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.{FileSystem, Path, RemoteIterator}
用于转换为 scala 迭代器
implicit def convertToScalaIterator[T](remoteIterator: RemoteIterator[T]): Iterator[T] = {
case class wrapper(remoteIterator: RemoteIterator[T]) extends Iterator[T] {
override def hasNext: Boolean = remoteIterator.hasNext
override def next(): T = remoteIterator.next()
}
wrapper(remoteIterator)
}
可用目录和文件的列表。
import sys.process._
scala> "tree /root/tmp/servername".!
/root/tmp/servername
└── data
├── a
│ └── something
│ ├── 2020-05-11
│ │ └── file1
│ └── 2020-05-12
│ └── file1
├── b
│ └── something
│ └── 2020-05-11
│ └── file1
└── c
└── something
└── 2020-06-11
└── file1
11 directories, 4 files
获取FileSystem
对象
def getFs(spark:SparkSession): String => FileSystem = (path: String) => {
FileSystem.get(URI.create(path),spark.sparkContext.hadoopConfiguration)
}
val fs = getFs(spark)
检查给定路径中可用文件的功能和匹配正则表达式模式的文件的返回路径。
def exists(path: String,find:Regex)(fs: String => FileSystem) = {
fs(path)
.listFiles(new Path(path),true)
.toList.filter(_.isFile)
.map(_.getPath)
.filter(c => find.findAllIn(c.toString).length != 0)
}
val fileList = exists("/root/tmp/servername","2020-05-*".r)(fs)
最终输出
scala> fileList.foreach(println)
file:/root/tmp/servername/data/b/something/2020-05-11/file1
file:/root/tmp/servername/data/a/something/2020-05-11/file1
file:/root/tmp/servername/data/a/something/2020-05-12/file1
要获取文件的元数据,请查看这篇文章 - df.metadata
推荐阅读
- python - pygame 不适用于 Visual Studio 代码(我使用的是 mac os catalina)
- crystal-reports - Crystal Reports:来自 distinctcount 的不同结果
- cross-compiling - Buildroot多个输出目录?
- redis - Redis HashScanAsync 不再可用
- kubeflow - 为 KFP Op 安装自定义依赖项
- swiftui - Swift UI Button 无法编译
- oracle - 验证列中的小数 - oracle sqldeveloper
- javascript - 简单的javascript知道点击了表格单元格中的哪个单元格
- xslt - XSL:检索标签值
- shell - awk:负指数未被正确解释