i am using Spark 1.1. I have a Spark job that seeks for a certain pattern of folders only under a bucket (i.e. folders that start with...), and should process only those. I achieve this by doing the following:
FileSystem fs = FileSystem.get(new Configuration(true));
FileStatus[] statusArr = fs.globStatus(new Path(inputPath));
List<FileStatus> statusList = Arrays.asList(statusArr);
List<String> pathsStr = convertFileStatusToPath(statusList);
JavaRDD<String> paths = sc.parallelize(pathsStr);
However, when running this job on a Google Cloud Storage path: gs://rsync-1/2014_07_31* (using the latest google cloud storage connector 1.2.9), I get the following error:
4/10/13 10:28:38 INFO slf4j.Slf4jLogger: Slf4jLogger started
14/10/13 10:28:38 INFO util.Utils: Successfully started service 'Driver' on port 60379.
14/10/13 10:28:38 INFO worker.WorkerWatcher: Connecting to worker akka.tcp://[email protected]:45212/user/Worker
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40)
at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.lang.IllegalArgumentException: Wrong bucket: rsync-1, in path: gs://rsync-1/2014_07_31*, expected bucket: hadoop-config
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.checkPath(GoogleHadoopFileSystem.java:100)
at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:294)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.makeQualified(GoogleHadoopFileSystemBase.java:457)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.getGcsPath(GoogleHadoopFileSystem.java:163)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.globStatus(GoogleHadoopFileSystemBase.java:1052)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.globStatus(GoogleHadoopFileSystemBase.java:1027)
at com.doit.customer.dataconverter.Phase0.main(Phase0.java:578)
... 6 more
When I ran this job on a local folder, everything worked fine.
hadoop-config is a bucket I use for deploying the Spark cluster on Google Compute Engine (using bdutil 0.35.2 tool)
Short Answer
Instead of using:
FileSystem fs = FileSystem.get(new Configuration(true));
FileStatus[] statusArr = fs.globStatus(new Path(inputPath));
List<FileStatus> statusList = Arrays.asList(statusArr);
you need to do
Path inputPathObj = new Path(inputPath);
FileSystem fs = FileSystem.get(inputPathObj.toUri(), new Configuration(true));
FileStatus[] statusArr = fs.globStatus(inputPathObj);
List<FileStatus> statusList = Arrays.asList(statusArr);
because in Hadoop, FileSystem instances are shared based on the scheme
and authority
component of the URI (and potentially user-group information in more advanced settings), and such instances are not interchangeable between schemes and authorities.
Long Answer
This has to do with the distinction between the hostname
and path
components of a URI
in [scheme]://[authority]/[path], which may be more obvious in the HDFS use case, but is also applicable to GCS. Basically, there are several get
methods in org.apache.hadoop.fs.FileSystem, and the most applicable ones here are:
public static FileSystem get(Configuration conf)
and
public static FileSystem get(URI uri, Configuration conf)
The former actually just calls the latter with:
return get(getDefaultUri(conf), conf);
where getDefaultUri(conf)
is defined by fs.default.name
or fs.defaultFS
. The second consideration is that FileSystems with different hosthname
or authority
components are considered to be inherently different filesystems; in the HDFS case, this makes sense, as:
FileSystem.get("hdfs://foo-cluster-namenode/", conf);
FileSystem.get("hdfs://bar-cluster-namenode/", conf);
each point at potentially completely different filesystem instances, on separate clusters, allowing the same pathnames to be used on the two separate HDFS instances to refer to separate storage namespaces. Though less transparent in terms of "hostnames" of machines, the bucket
in GCS indeed takes the role as the authority
component of a GCE URI--in Hadoop, this means FileSystem.get
literally returns the same cached Java FileSystem object when the bucket
is the same, but different instances for different buckets. Just as you can't create an HDFS instance and point it at a different authority:
// Can't mix authorities!
FileSystem.get("hdfs://foo/", conf).listStatus(new Path("hdfs://bar/"));
when you called FileSystem.get(conf)
you effectively got a cached instance pointed at gs://hadoop-config/
, and then used that to try to list gs://rsync-1
.
Instead, at the time you know the Path you want to operate on, that should be the time you fetch a FileSystem instance:
FileSystem fs = FileSystem.get(myPath.toUri(), new Configuration(true));
fs.globStatus(myPath);