首页 > 解决方案 > Sparklyr: List contents of directory in R using invoke methods

问题描述

Unable to find a sparklyr built in for listing the contents of a directory via Spark, I am attempting to use invoke:

sc <- spark_connect(master = "yarn", config=config)
path <- 'gs:// ***path to bucket on google cloud*** '
spath <- sparklyr::invoke_new(sc, 'org.apache.hadoop.fs.Path', path) 
fs <- sparklyr::invoke(spath, 'getFileSystem')
list <- sparklyr:: invoke(fs, 'listLocatedStatus') 
Error: java.lang.Exception: No matched method found for class org.apache.hadoop.fs.Path.getFileSystem
    at sparklyr.Invoke.invoke(invoke.scala:134)
    at sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
    at sparklyr.StreamHandler.read(stream.scala:66) ...

Note: Are there guidelines for reproducible examples with distributed code? I don't know how to make an example others could follow, given I am running against a particular Spark environment.

标签: rapache-sparksparklyr

解决方案


getFileSystem方法 org.apache.hadoop.conf.Configuration对象作为第一个参数:

public FileSystem getFileSystem(Configuration conf)
                     throws IOException

返回拥有此路径的文件系统。

参数

conf- 解析文件系统时使用的配置

所以检索FileSystem实例的代码应该或多或少像这样:

# Retrieve Spark's Hadoop configuration
hconf <- sc %>% spark_context() %>% invoke("hadoopConfiguration")
fs <- sparklyr::invoke(spath, 'getFileSystem', hconf)

另外listLocatedStatus 需要Path

public org.apache.hadoop.fs.RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
                                                                     throws FileNotFoundException,
                                                                            IOException

PathPathFilter(注意这个实现是protected):

public org.apache.hadoop.fs.RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
                                                                    throws FileNotFoundException,
                                                                            IOException

因此,如果您想按上述方式构建代码,则必须至少提供一条路径

sparklyr:: invoke(fs, "listLocatedStatus", spath)

在实践中,直接获取可能更容易FileSystem

fs <- invoke_static(sc, "org.apache.hadoop.fs.FileSystem", "get",  hconf)

并使用globStatus

lls <- invoke(fs, "globStatus", spath)

wherespath是带有通配符的路径,例如:

sparklyr::invoke_new(sc, 'org.apache.hadoop.fs.Path', "/some/path/*")

结果将是一个 R list,它可以很容易地迭代:

lls  %>%
    purrr::map(function(x) invoke(x, "getPath") %>% invoke("toString"))

学分

如何在 Spark Scala shell 中列出 HDFS 位置中的所有 csv 文件通过@jaime

备注

  • 一般来说,如果您与非平凡的 Java API 交互,那么用 Java 或 Scala 编写代码并提供最小的 R 接口会更有意义。
  • 对于与特定文件对象存储的交互,使用专用包可能更容易。对于 Google Cloud Storage,您可以查看googleCloudStorageR.

推荐阅读