Search code examples
rapache-sparksparklyr

Sparklyr: List contents of directory in R using invoke methods


Unable to find a sparklyr built in for listing the contents of a directory via Spark, I am attempting to use invoke:

sc <- spark_connect(master = "yarn", config=config)
path <- 'gs:// ***path to bucket on google cloud*** '
spath <- sparklyr::invoke_new(sc, 'org.apache.hadoop.fs.Path', path) 
fs <- sparklyr::invoke(spath, 'getFileSystem')
list <- sparklyr:: invoke(fs, 'listLocatedStatus') 
Error: java.lang.Exception: No matched method found for class org.apache.hadoop.fs.Path.getFileSystem
    at sparklyr.Invoke.invoke(invoke.scala:134)
    at sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
    at sparklyr.StreamHandler.read(stream.scala:66) ...

Note: Are there guidelines for reproducible examples with distributed code? I don't know how to make an example others could follow, given I am running against a particular Spark environment.


Solution

  • getFileSystem method takes org.apache.hadoop.conf.Configuration object as the first argument:

    public FileSystem getFileSystem(Configuration conf)
                         throws IOException
    

    Return the FileSystem that owns this Path.

    Parameters:

    conf - the configuration to use when resolving the FileSystem

    So the code to retrieve FileSystem instance should looks more or less like this:

    # Retrieve Spark's Hadoop configuration
    hconf <- sc %>% spark_context() %>% invoke("hadoopConfiguration")
    fs <- sparklyr::invoke(spath, 'getFileSystem', hconf)
    

    Additionally listLocatedStatus takes either Path

    public org.apache.hadoop.fs.RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
                                                                         throws FileNotFoundException,
                                                                                IOException
    

    or Path and PathFilter (note that this implementation is protected):

    public org.apache.hadoop.fs.RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
                                                                        throws FileNotFoundException,
                                                                                IOException
    

    So if you want to structure your code as shown above you'll have to provide at least a path

    sparklyr:: invoke(fs, "listLocatedStatus", spath)
    

    In practice it might be easier to just get FileSystem directly:

    fs <- invoke_static(sc, "org.apache.hadoop.fs.FileSystem", "get",  hconf)
    

    and use globStatus

    lls <- invoke(fs, "globStatus", spath)
    

    where spath is a path with wildcard, like:

    sparklyr::invoke_new(sc, 'org.apache.hadoop.fs.Path', "/some/path/*")
    

    The result will be an R list, which can be easily iterated:

    lls  %>%
        purrr::map(function(x) invoke(x, "getPath") %>% invoke("toString"))
    

    Credits:

    The answer to How can one list all csv files in an HDFS location within the Spark Scala shell? by @jaime

    Notes:

    • In general, if you interact with non-trivial Java API, it makes much more sense to write your code in Java or Scala, and provide a minimal R interface.
    • For interactions with specific file object store it might be easier to use a dedicated package. For Google Cloud Storage you can take a look at googleCloudStorageR.