Unable to find a sparklyr built in for listing the contents of a directory via Spark, I am attempting to use invoke
:
sc <- spark_connect(master = "yarn", config=config)
path <- 'gs:// ***path to bucket on google cloud*** '
spath <- sparklyr::invoke_new(sc, 'org.apache.hadoop.fs.Path', path)
fs <- sparklyr::invoke(spath, 'getFileSystem')
list <- sparklyr:: invoke(fs, 'listLocatedStatus')
Error: java.lang.Exception: No matched method found for class org.apache.hadoop.fs.Path.getFileSystem
at sparklyr.Invoke.invoke(invoke.scala:134)
at sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
at sparklyr.StreamHandler.read(stream.scala:66) ...
Note: Are there guidelines for reproducible examples with distributed code? I don't know how to make an example others could follow, given I am running against a particular Spark environment.
getFileSystem
method takes org.apache.hadoop.conf.Configuration
object as the first argument:
public FileSystem getFileSystem(Configuration conf) throws IOException
Return the FileSystem that owns this Path.
Parameters:
conf
- the configuration to use when resolving the FileSystem
So the code to retrieve FileSystem
instance should looks more or less like this:
# Retrieve Spark's Hadoop configuration
hconf <- sc %>% spark_context() %>% invoke("hadoopConfiguration")
fs <- sparklyr::invoke(spath, 'getFileSystem', hconf)
Additionally listLocatedStatus
takes either Path
public org.apache.hadoop.fs.RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f) throws FileNotFoundException, IOException
or Path
and PathFilter
(note that this implementation is protected
):
public org.apache.hadoop.fs.RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f) throws FileNotFoundException, IOException
So if you want to structure your code as shown above you'll have to provide at least a path
sparklyr:: invoke(fs, "listLocatedStatus", spath)
In practice it might be easier to just get FileSystem
directly:
fs <- invoke_static(sc, "org.apache.hadoop.fs.FileSystem", "get", hconf)
and use globStatus
lls <- invoke(fs, "globStatus", spath)
where spath
is a path with wildcard, like:
sparklyr::invoke_new(sc, 'org.apache.hadoop.fs.Path', "/some/path/*")
The result will be an R list
, which can be easily iterated:
lls %>%
purrr::map(function(x) invoke(x, "getPath") %>% invoke("toString"))
Credits:
The answer to How can one list all csv files in an HDFS location within the Spark Scala shell? by @jaime
Notes:
googleCloudStorageR
.