首页 > 解决方案 > Spark检查是否存在带有正则表达式的输入路径

问题描述

我正在尝试在使用正则表达式计算的路径下读取 JSON 文件,如下所示。

paths.par.foreach
{ 
    path =>
    val pathWithRegex = s"${path}/*/${dateRegex}/"
    val jsonDF = sqlContext.read.json(pathWithRegex)
}
paths could be - hdfs://servername/data/a, hdfs://servername/data/b, hdfs://servername/data/c
dateRegex could be - 2020-05-*

Directories present in hdfs
hdfs://servername/data/a/something/2020-05-11/file1
hdfs://servername/data/a/something/2020-05-12/file1
hdfs://servername/data/b/something/2020-05-11/file1
hdfs://servername/data/c/something/2020-06-11/file1

当我将 2020-05-* 作为 dateRegex 传递时,它会为 hdfs://servername/data/c/ /2020-05- / 抛出错误,因为路径不存在。有没有办法不抛出错误并继续?我尝试使用下面的 checkDirExist 方法,但它似乎不适用于正则表达式/模式。

def checkDirExist(path: String, sc:SparkContext): Boolean = {
    val fs = FileSystem.get(sc.hadoopConfiguration)
    val p = new Path(path)
    fs.exists(p)
}

paths.par.foreach
{ 
    path =>
    val pathWithRegex = s"${path}/*/${dateRegex}/"
    if(checkDirExist(pathWithRegex, sc)){ //Doesn't work. Always false if pattern is in path string
        val jsonDF = sqlContext.read.json(pathWithRegex)
    }
}

标签: apache-sparkhadoophdfs

解决方案


检查下面的代码。

下面的代码适用于hdfs, s3& local filesystem

导入所需的库。

import  scala.util.matching.Regex
import java.net.URI
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.{FileSystem, Path, RemoteIterator} 

用于转换为 scala 迭代器

implicit def convertToScalaIterator[T](remoteIterator: RemoteIterator[T]): Iterator[T] = {
    case class wrapper(remoteIterator: RemoteIterator[T]) extends Iterator[T] {
      override def hasNext: Boolean = remoteIterator.hasNext
      override def next(): T = remoteIterator.next()
    }
    wrapper(remoteIterator)
}

可用目录和文件的列表。

import sys.process._
scala> "tree /root/tmp/servername".!
/root/tmp/servername
└── data
    ├── a
    │   └── something
    │       ├── 2020-05-11
    │       │   └── file1
    │       └── 2020-05-12
    │           └── file1
    ├── b
    │   └── something
    │       └── 2020-05-11
    │           └── file1
    └── c
        └── something
            └── 2020-06-11
                └── file1

11 directories, 4 files

获取FileSystem对象

def getFs(spark:SparkSession): String => FileSystem = (path: String) => {
    FileSystem.get(URI.create(path),spark.sparkContext.hadoopConfiguration)
}
val fs = getFs(spark)

检查给定路径中可用文件的功能和匹配正则表达式模式的文件的返回路径。

def exists(path: String,find:Regex)(fs: String => FileSystem) = { 
    fs(path)
    .listFiles(new Path(path),true)
    .toList.filter(_.isFile)
    .map(_.getPath)
    .filter(c => find.findAllIn(c.toString).length != 0)   
}
val fileList = exists("/root/tmp/servername","2020-05-*".r)(fs)

最终输出

scala> fileList.foreach(println)
file:/root/tmp/servername/data/b/something/2020-05-11/file1
file:/root/tmp/servername/data/a/something/2020-05-11/file1
file:/root/tmp/servername/data/a/something/2020-05-12/file1

要获取文件的元数据,请查看这篇文章 - df.metadata


推荐阅读